spark MetricsSystem 完全揭秘

MetricsSystem系統(tǒng),顧名思義就是來度量系統(tǒng)的各項(xiàng)指標(biāo)的,比如說可以度量driver,worker,executor端的jvm相關(guān)的信息,來測(cè)試服務(wù)器的性能。spark中的MetricsSystem底層使用的是第三方的庫metrics,metrics是一套開源的度量系統(tǒng),可以用來計(jì)數(shù)、監(jiān)控某項(xiàng)指標(biāo)的均值等,MetricsSystem基于metrics做了一層封裝,但是系統(tǒng)結(jié)構(gòu)和metrics大體相同,理解了metrics后MetricsSystem也比較好理解

一:開源的度量系統(tǒng)Metrics

先來看看兩個(gè)例子

  1. gauges 用來監(jiān)控某個(gè)指標(biāo)的瞬時(shí)值
/**
 * demo for guages
 *
 * @author xiongmao
 * @create 2019-02-18 5:09 PM
 */
public class GaugesDemo {
    private static final MetricRegistry metrics = new MetricRegistry();
    private static Queue<String> queue = new LinkedBlockingDeque<String>();
    // 定義reporter,作為度量值的接受端,用于處理接受到的度量值,這里使用的是ConsoleReporter,也就是將度量值
    // 打印到控制臺(tái)
    // 這里調(diào)用了ConsolreReporter的forRegistry,主要是講reporter注冊(cè)到MetricRegistry中
    private static ConsoleReporter reporter = ConsoleReporter.forRegistry(metrics).build();

    public static void main(String[] args)throws Exception {
        // 啟動(dòng)reporter,開始處理接受到的度量結(jié)果
        reporter.start(3, TimeUnit.SECONDS);
        
        // 定義Gauge,也就是度量系統(tǒng)的檢測(cè)端,其實(shí)就是metric,Gauge繼承了metric
        Gauge<Integer> gauge = new Gauge<Integer>() {
            @Override
            public Integer getValue() {
                return queue.size();
            }
        };
        // 將metics注冊(cè)到Metrics中,這樣reporter就知道接受哪個(gè)metrics中的數(shù)據(jù)了
        metrics.register(MetricRegistry.name(GaugesDemo.class,"pending-job", "size"), gauge);

//        JmxReporter jmxReporter = JmxReporter.forRegistry(metrics).build();
//        jmxReporter.start();

        for (int i = 0; i < 20; i++) {
            queue.add("a");
            Thread.sleep(1000);
        }
    }
}

  1. Counter 指標(biāo)計(jì)數(shù)
/**
 * counter demo
 * 
 * @author xiongmao
 * @create 2019-02-19 1:49 PM
 */
public class CounterDemo {
    private static final MetricRegistry metrics = new MetricRegistry();
    // 定義reporter,并注冊(cè)到MetricRegistry中
    private static final ConsoleReporter reporter = ConsoleReporter.forRegistry(metrics).build();
    // 定義Counter,同樣counter也繼承了Metrics
    private static final Counter pendingJobs = new Counter();

    public static void add(String str) {
        pendingJobs.inc();
    }
    public static void main(String[] args)throws Exception {
        // 注冊(cè)metrics到MetricRegistry,與reporter對(duì)應(yīng)
        metrics.register(MetricRegistry.name(CounterDemo.class,"pending-jobs"),pendingJobs);
        reporter.start(3, TimeUnit.SECONDS);
        while (true) {
            add("1");
            Thread.sleep(1000);
        }
    }
}

上面兩個(gè)例子可以看出Metrics中 MetricRegistry作為對(duì)量系統(tǒng)的中樞大腦,metrics和reporter必須要注冊(cè)到同一個(gè)MetricRegistry中才能協(xié)同工作,可以猜想一個(gè)reporter可以接受多個(gè)metrics的度量結(jié)果,一個(gè)metrics的度量結(jié)果可以被多個(gè)reporter接受,只要這些reporter和metrics注冊(cè)到同一個(gè)MetricRegistry中即可

  1. 多個(gè)reporter和多個(gè)metrics注冊(cè)到同一個(gè)MetricRegistry中
/**
 * counter demo
 * 同一個(gè)MetricRegistry 可以注冊(cè)多個(gè)metric和多個(gè)reporter,多個(gè)metrics的度量輸出會(huì)被每一個(gè)reporter接受
 * @author xiongmao
 * @create 2019-02-19 1:49 PM
 */
public class CounterDemo {
    private static final MetricRegistry metrics = new MetricRegistry();

    private static final ConsoleReporter reporter = ConsoleReporter.forRegistry(metrics).build();

    private static final ConsoleReporter reporter1 = ConsoleReporter.forRegistry(metrics).build();

    private static Queue<String> queue = new LinkedBlockingDeque<String>();

    private static final Counter pendingJobs = new Counter();

    public static void add(String str) {
        pendingJobs.inc();
    }
    public static void main(String[] args)throws Exception {
        Gauge<Integer> gauge = new Gauge<Integer>() {
            @Override
            public Integer getValue() {
                return queue.size();
            }
        };

        metrics.register(MetricRegistry.name(CounterDemo.class,"pending-jobs"),pendingJobs);
        metrics.register(MetricRegistry.name(CounterDemo.class,"gauge"),gauge);

        reporter.start(3, TimeUnit.SECONDS);
        reporter1.start(3, TimeUnit.SECONDS);
        while (true) {
            add("1");
            queue.add("1");
            Thread.sleep(1000);
        }
    }
}

  1. Metirc、reporter、metricRegistry關(guān)系圖解

Metric系統(tǒng)的三個(gè)組件Metirc、reporter、metricRegistry之間的關(guān)系由metricRegistry來協(xié)同,只有把metric和reporter都注冊(cè)到metricRegistry中才能保證度量系統(tǒng)正常的工作

二:spark中的MetricsSystem
  1. MetricsSystem的體系結(jié)構(gòu)
  • Source:度量系統(tǒng)的數(shù)據(jù)源,也就是Metric體系中的metric組件
// source內(nèi)部維護(hù)了一個(gè)MetricRegistry,用于注冊(cè)Metrics
private[spark] trait Source {
  def sourceName: String
  def metricRegistry: MetricRegistry
}

//spark中JvmSource具體實(shí)現(xiàn),這個(gè)地方有個(gè)疑問,上面說到的metricRegistry使用來注冊(cè)的Metric的,這里Source并沒有繼承Metric啊,那么這個(gè)
// Source是怎么當(dāng)做Metric來使用的,這里就要看metricRegistry這個(gè)value了,MetricRegistry這個(gè)類其實(shí)也是繼承MetricSet(MetricSet繼承了Metric)的,該類里面提供了兩個(gè)方法
// register和registerAll,其中register使用注冊(cè)單個(gè)的Metric對(duì)象的,registerAll是用來注冊(cè)MetricSet,該方法最終調(diào)用的也是register方法
// MetricsSystem注冊(cè)Metric實(shí)際上是將source對(duì)象的metricRegistry注冊(cè)到MetricsSystem內(nèi)部的metricRegistry里面
// 
private[spark] class JvmSource extends Source {
  override val sourceName = "jvm"
  override val metricRegistry = new MetricRegistry()

  metricRegistry.registerAll(new GarbageCollectorMetricSet)
  metricRegistry.registerAll(new MemoryUsageGaugeSet)
  metricRegistry.registerAll(
    new BufferPoolMetricSet(ManagementFactory.getPlatformMBeanServer))
}

// MetricRegistry的register方法
 public <T extends Metric> T register(String name, T metric) throws IllegalArgumentException {
        if (metric instanceof MetricSet) {
            // 如果是MetricSet則調(diào)用registerAll
            //此處很關(guān)鍵,此處表明MetricRegistry可以注冊(cè)MetricRegistry的對(duì)象,因?yàn)镸etricRegistry是MetricSet的子類
            this.registerAll(name, (MetricSet)metric);
        } else {
            Metric existing = (Metric)this.metrics.putIfAbsent(name, metric);
            if (existing != null) {
                throw new IllegalArgumentException("A metric named " + name + " already exists");
            }

            this.onMetricAdded(name, metric);
        }

        return metric;
    }

// MetricRegistry的registerAll方法
 public void registerAll(MetricSet metrics) throws IllegalArgumentException {
        this.registerAll((String)null, metrics);
    }

private void registerAll(String prefix, MetricSet metrics) throws IllegalArgumentException {
        
        Iterator var3 = metrics.getMetrics().entrySet().iterator();

        while(var3.hasNext()) {
            Entry<String, Metric> entry = (Entry)var3.next();
            if (entry.getValue() instanceof MetricSet) {
                this.registerAll(name(prefix, (String)entry.getKey()), (MetricSet)entry.getValue());
            } else {
                // 最終還是調(diào)用register
                this.register(name(prefix, (String)entry.getKey()), (Metric)entry.getValue());
            }
        }

    }
  • Sink:度量系統(tǒng)度量結(jié)果的接收端,也就是Metric體系中的reporter
// Sink trait ,從這里看不出任何reporter的影子
private[spark] trait Sink {
  def start(): Unit
  def stop(): Unit
  def report(): Unit
}

// ConsoleSink的具體實(shí)現(xiàn),這個(gè)地方有點(diǎn)不合理,trait沒有任何信號(hào)透露出MetricRegistr 這個(gè)對(duì)象怎么和Sink產(chǎn)生關(guān)聯(lián),這里使用的在構(gòu)造函數(shù)中
// 傳入這個(gè)registry,這一點(diǎn)不符合抽象編程的規(guī)范,這里應(yīng)該把Sink改成一個(gè)具體的類或者類似于Source一樣的,維護(hù)一個(gè)MetricRegistr對(duì)象
// 從這個(gè)ConsoleSink可以看出,在Sink具體類實(shí)例化是會(huì)傳入一個(gè) MetricRegistr對(duì)象并初始化reporter,這樣Sink和reporter就關(guān)聯(lián)起來了
private[spark] class ConsoleSink(val property: Properties, val registry: MetricRegistry,
    securityMgr: SecurityManager) extends Sink {
  val CONSOLE_DEFAULT_PERIOD = 10
  val CONSOLE_DEFAULT_UNIT = "SECONDS"

  val CONSOLE_KEY_PERIOD = "period"
  val CONSOLE_KEY_UNIT = "unit"

  val pollPeriod = Option(property.getProperty(CONSOLE_KEY_PERIOD)) match {
    case Some(s) => s.toInt
    case None => CONSOLE_DEFAULT_PERIOD
  }

  val pollUnit: TimeUnit = Option(property.getProperty(CONSOLE_KEY_UNIT)) match {
    case Some(s) => TimeUnit.valueOf(s.toUpperCase(Locale.ROOT))
    case None => TimeUnit.valueOf(CONSOLE_DEFAULT_UNIT)
  }

  MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod)

  // sink內(nèi)部持有的reporter對(duì)象,  將這個(gè)reporter注冊(cè)到registry中
  val reporter: ConsoleReporter = ConsoleReporter.forRegistry(registry)
      .convertDurationsTo(TimeUnit.MILLISECONDS)
      .convertRatesTo(TimeUnit.SECONDS)
      .build()

  override def start() {
    reporter.start(pollPeriod, pollUnit)
  }

  override def stop() {
    reporter.stop()
  }

  override def report() {
    reporter.report()
  }
}
  • MetricsSystem:度量系統(tǒng)的中樞大腦,里面維護(hù)了一個(gè)MetricRegistry的實(shí)例,Source和Sink的都是通過這個(gè)MetricRegistry注冊(cè)的。整體上spark中的MetricsSystem的設(shè)計(jì)思路和原生Metric的設(shè)計(jì)思路一樣,MetricsSystem其實(shí)就是對(duì)原生3個(gè)組件的封裝
// 內(nèi)部維護(hù)一個(gè)MetricRegistry對(duì)象,用來注冊(cè)Source和sink,使用該registry注冊(cè)的source和sink就可以協(xié)同工作了
private val registry = new MetricRegistry()
  1. MetricsSystem的工作原理
  • 初始化:MeticsSystem的初始化是在SparkContext中完成的,具體的是在SparkEnv創(chuàng)建的過程中創(chuàng)建的
//如果是driver端,則要等待taskScheduler提交作業(yè)后的app id,如果是executor的話則直接啟動(dòng)
val metricsSystem = if (isDriver) {
      // Don't start metrics system right now for Driver.
      // We need to wait for the task scheduler to give us an app ID.
      // Then we can start the metrics system.
      MetricsSystem.createMetricsSystem("driver", conf, securityManager)
    } else {
      // We need to set the executor ID before the MetricsSystem is created because sources and
      // sinks specified in the metrics configuration file will want to incorporate this executor's
      // ID into the metrics they report.
      conf.set("spark.executor.id", executorId)
      val ms = MetricsSystem.createMetricsSystem("executor", conf, securityManager)
      ms.start()
      ms
    }
  • 啟動(dòng):如果是driver端,需要等到taskScheduler啟動(dòng)app之后才會(huì)啟動(dòng)MetricsSystem(需要appId),如果是executor端的話,在創(chuàng)建之后即可啟動(dòng)
 _applicationId = _taskScheduler.applicationId()
    _applicationAttemptId = taskScheduler.applicationAttemptId()
    _conf.set("spark.app.id", _applicationId)
    if (_conf.getBoolean("spark.ui.reverseProxy", false)) {
      System.setProperty("spark.ui.proxyBase", "/proxy/" + _applicationId)
    }
    _ui.foreach(_.setAppId(_applicationId))
    _env.blockManager.initialize(_applicationId)

    // The metrics system for Driver need to be set spark.app.id to app ID.
    // So it should start after we get app ID from the task scheduler and set spark.app.id.
    _env.metricsSystem.start()
    _env.metricsSystem.getServletHandlers.foreach(handler => ui.foreach(_.attachHandler(handler)))

 // start()方法
 def start() {
    require(!running, "Attempting to start a MetricsSystem that is already running")
    running = true
    StaticSources.allSources.foreach(registerSource)
    registerSources()
    registerSinks()
    sinks.foreach(_.start)
  }

start()方法內(nèi)部調(diào)用了registerSources和registerSinks,也就是將source和sink注冊(cè)到MetricsSystem內(nèi)部對(duì)象MetricRegistry中

//先調(diào)用registerSources(),獲取到instance(driver,worker,executor等)對(duì)應(yīng)配置文件中的source
//然后調(diào)用registerSource(source: Source)方法,將source注冊(cè)到MetricsSystem中的內(nèi)部對(duì)象MetricRegistry中
private def registerSources() {
    val instConfig = metricsConfig.getInstance(instance)
    val sourceConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SOURCE_REGEX)

    // Register all the sources related to instance
    sourceConfigs.foreach { kv =>
      val classPath = kv._2.getProperty("class")
      try {
        val source = Utils.classForName(classPath).newInstance()
        registerSource(source.asInstanceOf[Source])
      } catch {
        case e: Exception => logError("Source class " + classPath + " cannot be instantiated", e)
      }
    }
  }

def registerSource(source: Source) {
    sources += source
    try {
      val regName = buildRegistryName(source)
      // 此處就是上面解釋Source里面說的將Source內(nèi)部的metricRegistry注冊(cè)到MetricsSystem里面的metricRegistry中
      registry.register(regName, source.metricRegistry)
    } catch {
      case e: IllegalArgumentException => logInfo("Metrics already registered", e)
    }
  }
// 注冊(cè)Sinks,重點(diǎn)是下面的newInstance
private def registerSinks() {
    val instConfig = metricsConfig.getInstance(instance)
    val sinkConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SINK_REGEX)

    sinkConfigs.foreach { kv =>
      val classPath = kv._2.getProperty("class")
      if (null != classPath) {
        try {
          // 實(shí)例化時(shí)傳入registry,并用這個(gè)registry注冊(cè)內(nèi)部的reporter對(duì)象,從抽象編程來說,這個(gè)地方很不合理
          val sink = Utils.classForName(classPath)
            .getConstructor(classOf[Properties], classOf[MetricRegistry], classOf[SecurityManager])
            .newInstance(kv._2, registry, securityMgr)
          if (kv._1 == "servlet") {
            metricsServlet = Some(sink.asInstanceOf[MetricsServlet])
          } else {
            sinks += sink.asInstanceOf[Sink]
          }
        } catch {
          case e: Exception =>
            logError("Sink class " + classPath + " cannot be instantiated")
            throw e
        }
      }
    }
  }
}

注冊(cè)號(hào)Source和sink之后,調(diào)用循環(huán)調(diào)用sink的的start()方法,度量系統(tǒng)開始工作

三:Q&A
  1. Sink trait 設(shè)計(jì)有明顯的問題,怎么改進(jìn)?
  2. metricServlet這個(gè)類是spark默認(rèn)提供的Sink,可以通過http的方式訪問source度量的到的結(jié)果,但是在這個(gè)sink并沒有調(diào)用start()方法啟動(dòng),那么這個(gè)類是如果采集到source發(fā)送過來的數(shù)據(jù)的?
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容