MetricsSystem系統(tǒng),顧名思義就是來度量系統(tǒng)的各項(xiàng)指標(biāo)的,比如說可以度量driver,worker,executor端的jvm相關(guān)的信息,來測(cè)試服務(wù)器的性能。spark中的MetricsSystem底層使用的是第三方的庫metrics,metrics是一套開源的度量系統(tǒng),可以用來計(jì)數(shù)、監(jiān)控某項(xiàng)指標(biāo)的均值等,MetricsSystem基于metrics做了一層封裝,但是系統(tǒng)結(jié)構(gòu)和metrics大體相同,理解了metrics后MetricsSystem也比較好理解
一:開源的度量系統(tǒng)Metrics
先來看看兩個(gè)例子
-
gauges 用來監(jiān)控某個(gè)指標(biāo)的瞬時(shí)值
/**
* demo for guages
*
* @author xiongmao
* @create 2019-02-18 5:09 PM
*/
public class GaugesDemo {
private static final MetricRegistry metrics = new MetricRegistry();
private static Queue<String> queue = new LinkedBlockingDeque<String>();
// 定義reporter,作為度量值的接受端,用于處理接受到的度量值,這里使用的是ConsoleReporter,也就是將度量值
// 打印到控制臺(tái)
// 這里調(diào)用了ConsolreReporter的forRegistry,主要是講reporter注冊(cè)到MetricRegistry中
private static ConsoleReporter reporter = ConsoleReporter.forRegistry(metrics).build();
public static void main(String[] args)throws Exception {
// 啟動(dòng)reporter,開始處理接受到的度量結(jié)果
reporter.start(3, TimeUnit.SECONDS);
// 定義Gauge,也就是度量系統(tǒng)的檢測(cè)端,其實(shí)就是metric,Gauge繼承了metric
Gauge<Integer> gauge = new Gauge<Integer>() {
@Override
public Integer getValue() {
return queue.size();
}
};
// 將metics注冊(cè)到Metrics中,這樣reporter就知道接受哪個(gè)metrics中的數(shù)據(jù)了
metrics.register(MetricRegistry.name(GaugesDemo.class,"pending-job", "size"), gauge);
// JmxReporter jmxReporter = JmxReporter.forRegistry(metrics).build();
// jmxReporter.start();
for (int i = 0; i < 20; i++) {
queue.add("a");
Thread.sleep(1000);
}
}
}
-
Counter 指標(biāo)計(jì)數(shù)
/**
* counter demo
*
* @author xiongmao
* @create 2019-02-19 1:49 PM
*/
public class CounterDemo {
private static final MetricRegistry metrics = new MetricRegistry();
// 定義reporter,并注冊(cè)到MetricRegistry中
private static final ConsoleReporter reporter = ConsoleReporter.forRegistry(metrics).build();
// 定義Counter,同樣counter也繼承了Metrics
private static final Counter pendingJobs = new Counter();
public static void add(String str) {
pendingJobs.inc();
}
public static void main(String[] args)throws Exception {
// 注冊(cè)metrics到MetricRegistry,與reporter對(duì)應(yīng)
metrics.register(MetricRegistry.name(CounterDemo.class,"pending-jobs"),pendingJobs);
reporter.start(3, TimeUnit.SECONDS);
while (true) {
add("1");
Thread.sleep(1000);
}
}
}
上面兩個(gè)例子可以看出Metrics中 MetricRegistry作為對(duì)量系統(tǒng)的中樞大腦,metrics和reporter必須要注冊(cè)到同一個(gè)MetricRegistry中才能協(xié)同工作,可以猜想一個(gè)reporter可以接受多個(gè)metrics的度量結(jié)果,一個(gè)metrics的度量結(jié)果可以被多個(gè)reporter接受,只要這些reporter和metrics注冊(cè)到同一個(gè)MetricRegistry中即可
-
多個(gè)reporter和多個(gè)metrics注冊(cè)到同一個(gè)MetricRegistry中
/**
* counter demo
* 同一個(gè)MetricRegistry 可以注冊(cè)多個(gè)metric和多個(gè)reporter,多個(gè)metrics的度量輸出會(huì)被每一個(gè)reporter接受
* @author xiongmao
* @create 2019-02-19 1:49 PM
*/
public class CounterDemo {
private static final MetricRegistry metrics = new MetricRegistry();
private static final ConsoleReporter reporter = ConsoleReporter.forRegistry(metrics).build();
private static final ConsoleReporter reporter1 = ConsoleReporter.forRegistry(metrics).build();
private static Queue<String> queue = new LinkedBlockingDeque<String>();
private static final Counter pendingJobs = new Counter();
public static void add(String str) {
pendingJobs.inc();
}
public static void main(String[] args)throws Exception {
Gauge<Integer> gauge = new Gauge<Integer>() {
@Override
public Integer getValue() {
return queue.size();
}
};
metrics.register(MetricRegistry.name(CounterDemo.class,"pending-jobs"),pendingJobs);
metrics.register(MetricRegistry.name(CounterDemo.class,"gauge"),gauge);
reporter.start(3, TimeUnit.SECONDS);
reporter1.start(3, TimeUnit.SECONDS);
while (true) {
add("1");
queue.add("1");
Thread.sleep(1000);
}
}
}
-
Metirc、reporter、metricRegistry關(guān)系圖解

Metric系統(tǒng)的三個(gè)組件Metirc、reporter、metricRegistry之間的關(guān)系由metricRegistry來協(xié)同,只有把metric和reporter都注冊(cè)到metricRegistry中才能保證度量系統(tǒng)正常的工作
二:spark中的MetricsSystem
-
MetricsSystem的體系結(jié)構(gòu)
- Source:度量系統(tǒng)的數(shù)據(jù)源,也就是Metric體系中的metric組件
// source內(nèi)部維護(hù)了一個(gè)MetricRegistry,用于注冊(cè)Metrics
private[spark] trait Source {
def sourceName: String
def metricRegistry: MetricRegistry
}
//spark中JvmSource具體實(shí)現(xiàn),這個(gè)地方有個(gè)疑問,上面說到的metricRegistry使用來注冊(cè)的Metric的,這里Source并沒有繼承Metric啊,那么這個(gè)
// Source是怎么當(dāng)做Metric來使用的,這里就要看metricRegistry這個(gè)value了,MetricRegistry這個(gè)類其實(shí)也是繼承MetricSet(MetricSet繼承了Metric)的,該類里面提供了兩個(gè)方法
// register和registerAll,其中register使用注冊(cè)單個(gè)的Metric對(duì)象的,registerAll是用來注冊(cè)MetricSet,該方法最終調(diào)用的也是register方法
// MetricsSystem注冊(cè)Metric實(shí)際上是將source對(duì)象的metricRegistry注冊(cè)到MetricsSystem內(nèi)部的metricRegistry里面
//
private[spark] class JvmSource extends Source {
override val sourceName = "jvm"
override val metricRegistry = new MetricRegistry()
metricRegistry.registerAll(new GarbageCollectorMetricSet)
metricRegistry.registerAll(new MemoryUsageGaugeSet)
metricRegistry.registerAll(
new BufferPoolMetricSet(ManagementFactory.getPlatformMBeanServer))
}
// MetricRegistry的register方法
public <T extends Metric> T register(String name, T metric) throws IllegalArgumentException {
if (metric instanceof MetricSet) {
// 如果是MetricSet則調(diào)用registerAll
//此處很關(guān)鍵,此處表明MetricRegistry可以注冊(cè)MetricRegistry的對(duì)象,因?yàn)镸etricRegistry是MetricSet的子類
this.registerAll(name, (MetricSet)metric);
} else {
Metric existing = (Metric)this.metrics.putIfAbsent(name, metric);
if (existing != null) {
throw new IllegalArgumentException("A metric named " + name + " already exists");
}
this.onMetricAdded(name, metric);
}
return metric;
}
// MetricRegistry的registerAll方法
public void registerAll(MetricSet metrics) throws IllegalArgumentException {
this.registerAll((String)null, metrics);
}
private void registerAll(String prefix, MetricSet metrics) throws IllegalArgumentException {
Iterator var3 = metrics.getMetrics().entrySet().iterator();
while(var3.hasNext()) {
Entry<String, Metric> entry = (Entry)var3.next();
if (entry.getValue() instanceof MetricSet) {
this.registerAll(name(prefix, (String)entry.getKey()), (MetricSet)entry.getValue());
} else {
// 最終還是調(diào)用register
this.register(name(prefix, (String)entry.getKey()), (Metric)entry.getValue());
}
}
}
- Sink:度量系統(tǒng)度量結(jié)果的接收端,也就是Metric體系中的reporter
// Sink trait ,從這里看不出任何reporter的影子
private[spark] trait Sink {
def start(): Unit
def stop(): Unit
def report(): Unit
}
// ConsoleSink的具體實(shí)現(xiàn),這個(gè)地方有點(diǎn)不合理,trait沒有任何信號(hào)透露出MetricRegistr 這個(gè)對(duì)象怎么和Sink產(chǎn)生關(guān)聯(lián),這里使用的在構(gòu)造函數(shù)中
// 傳入這個(gè)registry,這一點(diǎn)不符合抽象編程的規(guī)范,這里應(yīng)該把Sink改成一個(gè)具體的類或者類似于Source一樣的,維護(hù)一個(gè)MetricRegistr對(duì)象
// 從這個(gè)ConsoleSink可以看出,在Sink具體類實(shí)例化是會(huì)傳入一個(gè) MetricRegistr對(duì)象并初始化reporter,這樣Sink和reporter就關(guān)聯(lián)起來了
private[spark] class ConsoleSink(val property: Properties, val registry: MetricRegistry,
securityMgr: SecurityManager) extends Sink {
val CONSOLE_DEFAULT_PERIOD = 10
val CONSOLE_DEFAULT_UNIT = "SECONDS"
val CONSOLE_KEY_PERIOD = "period"
val CONSOLE_KEY_UNIT = "unit"
val pollPeriod = Option(property.getProperty(CONSOLE_KEY_PERIOD)) match {
case Some(s) => s.toInt
case None => CONSOLE_DEFAULT_PERIOD
}
val pollUnit: TimeUnit = Option(property.getProperty(CONSOLE_KEY_UNIT)) match {
case Some(s) => TimeUnit.valueOf(s.toUpperCase(Locale.ROOT))
case None => TimeUnit.valueOf(CONSOLE_DEFAULT_UNIT)
}
MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod)
// sink內(nèi)部持有的reporter對(duì)象, 將這個(gè)reporter注冊(cè)到registry中
val reporter: ConsoleReporter = ConsoleReporter.forRegistry(registry)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.convertRatesTo(TimeUnit.SECONDS)
.build()
override def start() {
reporter.start(pollPeriod, pollUnit)
}
override def stop() {
reporter.stop()
}
override def report() {
reporter.report()
}
}
- MetricsSystem:度量系統(tǒng)的中樞大腦,里面維護(hù)了一個(gè)MetricRegistry的實(shí)例,Source和Sink的都是通過這個(gè)MetricRegistry注冊(cè)的。整體上spark中的MetricsSystem的設(shè)計(jì)思路和原生Metric的設(shè)計(jì)思路一樣,MetricsSystem其實(shí)就是對(duì)原生3個(gè)組件的封裝
// 內(nèi)部維護(hù)一個(gè)MetricRegistry對(duì)象,用來注冊(cè)Source和sink,使用該registry注冊(cè)的source和sink就可以協(xié)同工作了
private val registry = new MetricRegistry()
-
MetricsSystem的工作原理
- 初始化:MeticsSystem的初始化是在SparkContext中完成的,具體的是在SparkEnv創(chuàng)建的過程中創(chuàng)建的
//如果是driver端,則要等待taskScheduler提交作業(yè)后的app id,如果是executor的話則直接啟動(dòng)
val metricsSystem = if (isDriver) {
// Don't start metrics system right now for Driver.
// We need to wait for the task scheduler to give us an app ID.
// Then we can start the metrics system.
MetricsSystem.createMetricsSystem("driver", conf, securityManager)
} else {
// We need to set the executor ID before the MetricsSystem is created because sources and
// sinks specified in the metrics configuration file will want to incorporate this executor's
// ID into the metrics they report.
conf.set("spark.executor.id", executorId)
val ms = MetricsSystem.createMetricsSystem("executor", conf, securityManager)
ms.start()
ms
}
- 啟動(dòng):如果是driver端,需要等到taskScheduler啟動(dòng)app之后才會(huì)啟動(dòng)MetricsSystem(需要appId),如果是executor端的話,在創(chuàng)建之后即可啟動(dòng)
_applicationId = _taskScheduler.applicationId()
_applicationAttemptId = taskScheduler.applicationAttemptId()
_conf.set("spark.app.id", _applicationId)
if (_conf.getBoolean("spark.ui.reverseProxy", false)) {
System.setProperty("spark.ui.proxyBase", "/proxy/" + _applicationId)
}
_ui.foreach(_.setAppId(_applicationId))
_env.blockManager.initialize(_applicationId)
// The metrics system for Driver need to be set spark.app.id to app ID.
// So it should start after we get app ID from the task scheduler and set spark.app.id.
_env.metricsSystem.start()
_env.metricsSystem.getServletHandlers.foreach(handler => ui.foreach(_.attachHandler(handler)))
// start()方法
def start() {
require(!running, "Attempting to start a MetricsSystem that is already running")
running = true
StaticSources.allSources.foreach(registerSource)
registerSources()
registerSinks()
sinks.foreach(_.start)
}
start()方法內(nèi)部調(diào)用了registerSources和registerSinks,也就是將source和sink注冊(cè)到MetricsSystem內(nèi)部對(duì)象MetricRegistry中
//先調(diào)用registerSources(),獲取到instance(driver,worker,executor等)對(duì)應(yīng)配置文件中的source
//然后調(diào)用registerSource(source: Source)方法,將source注冊(cè)到MetricsSystem中的內(nèi)部對(duì)象MetricRegistry中
private def registerSources() {
val instConfig = metricsConfig.getInstance(instance)
val sourceConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SOURCE_REGEX)
// Register all the sources related to instance
sourceConfigs.foreach { kv =>
val classPath = kv._2.getProperty("class")
try {
val source = Utils.classForName(classPath).newInstance()
registerSource(source.asInstanceOf[Source])
} catch {
case e: Exception => logError("Source class " + classPath + " cannot be instantiated", e)
}
}
}
def registerSource(source: Source) {
sources += source
try {
val regName = buildRegistryName(source)
// 此處就是上面解釋Source里面說的將Source內(nèi)部的metricRegistry注冊(cè)到MetricsSystem里面的metricRegistry中
registry.register(regName, source.metricRegistry)
} catch {
case e: IllegalArgumentException => logInfo("Metrics already registered", e)
}
}
// 注冊(cè)Sinks,重點(diǎn)是下面的newInstance
private def registerSinks() {
val instConfig = metricsConfig.getInstance(instance)
val sinkConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SINK_REGEX)
sinkConfigs.foreach { kv =>
val classPath = kv._2.getProperty("class")
if (null != classPath) {
try {
// 實(shí)例化時(shí)傳入registry,并用這個(gè)registry注冊(cè)內(nèi)部的reporter對(duì)象,從抽象編程來說,這個(gè)地方很不合理
val sink = Utils.classForName(classPath)
.getConstructor(classOf[Properties], classOf[MetricRegistry], classOf[SecurityManager])
.newInstance(kv._2, registry, securityMgr)
if (kv._1 == "servlet") {
metricsServlet = Some(sink.asInstanceOf[MetricsServlet])
} else {
sinks += sink.asInstanceOf[Sink]
}
} catch {
case e: Exception =>
logError("Sink class " + classPath + " cannot be instantiated")
throw e
}
}
}
}
}
注冊(cè)號(hào)Source和sink之后,調(diào)用循環(huán)調(diào)用sink的的start()方法,度量系統(tǒng)開始工作
三:Q&A
- Sink trait 設(shè)計(jì)有明顯的問題,怎么改進(jìn)?
- metricServlet這個(gè)類是spark默認(rèn)提供的Sink,可以通過http的方式訪問source度量的到的結(jié)果,但是在這個(gè)sink并沒有調(diào)用start()方法啟動(dòng),那么這個(gè)類是如果采集到source發(fā)送過來的數(shù)據(jù)的?