hadoop JvmMetrics 源码
haddop JvmMetrics 代码
文件路径:/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/source/JvmMetrics.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.metrics2.source;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
import java.lang.management.MemoryUsage;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.lang.management.GarbageCollectorMXBean;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.metrics2.MetricsCollector;
import org.apache.hadoop.metrics2.MetricsInfo;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.metrics2.MetricsSource;
import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.Interns;
import static org.apache.hadoop.metrics2.source.JvmMetricsInfo.*;
import static org.apache.hadoop.metrics2.impl.MsInfo.*;
import org.apache.hadoop.util.GcTimeMonitor;
import org.apache.hadoop.util.JvmPauseMonitor;
/**
* JVM and logging related metrics.
* Mostly used by various servers as a part of the metrics they export.
*/
@InterfaceAudience.Private
public class JvmMetrics implements MetricsSource {
enum Singleton {
INSTANCE;
JvmMetrics impl;
synchronized JvmMetrics init(String processName, String sessionId) {
if (impl == null) {
impl = create(processName, sessionId, DefaultMetricsSystem.instance());
}
return impl;
}
synchronized void shutdown() {
DefaultMetricsSystem.instance().unregisterSource(JvmMetrics.name());
impl = null;
}
}
@VisibleForTesting
public synchronized void registerIfNeeded(){
// during tests impl might exist, but is not registered
MetricsSystem ms = DefaultMetricsSystem.instance();
if (ms.getSource("JvmMetrics") == null) {
ms.register(JvmMetrics.name(), JvmMetrics.description(), this);
}
}
static final float M = 1024*1024;
static public final float MEMORY_MAX_UNLIMITED_MB = -1;
final MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
final List<GarbageCollectorMXBean> gcBeans =
ManagementFactory.getGarbageCollectorMXBeans();
private ThreadMXBean threadMXBean;
final String processName, sessionId;
private JvmPauseMonitor pauseMonitor = null;
final ConcurrentHashMap<String, MetricsInfo[]> gcInfoCache =
new ConcurrentHashMap<String, MetricsInfo[]>();
private GcTimeMonitor gcTimeMonitor = null;
@VisibleForTesting
JvmMetrics(String processName, String sessionId, boolean useThreadMXBean) {
this.processName = processName;
this.sessionId = sessionId;
if (useThreadMXBean) {
this.threadMXBean = ManagementFactory.getThreadMXBean();
}
}
public void setPauseMonitor(final JvmPauseMonitor pauseMonitor) {
this.pauseMonitor = pauseMonitor;
}
public void setGcTimeMonitor(GcTimeMonitor gcTimeMonitor) {
Preconditions.checkNotNull(gcTimeMonitor);
this.gcTimeMonitor = gcTimeMonitor;
}
public static JvmMetrics create(String processName, String sessionId,
MetricsSystem ms) {
// Reloading conf instead of getting from outside since it's redundant in
// code level to update all the callers across lots of modules,
// this method is called at most once for components (NN/DN/RM/NM/...)
// so that the overall cost is not expensive.
boolean useThreadMXBean = new Configuration().getBoolean(
CommonConfigurationKeys.HADOOP_METRICS_JVM_USE_THREAD_MXBEAN,
CommonConfigurationKeys.HADOOP_METRICS_JVM_USE_THREAD_MXBEAN_DEFAULT);
return ms.register(JvmMetrics.name(), JvmMetrics.description(),
new JvmMetrics(processName, sessionId, useThreadMXBean));
}
public static void reattach(MetricsSystem ms, JvmMetrics jvmMetrics) {
ms.register(JvmMetrics.name(), JvmMetrics.description(), jvmMetrics);
}
public static JvmMetrics initSingleton(String processName, String sessionId) {
return Singleton.INSTANCE.init(processName, sessionId);
}
/**
* Shutdown the JvmMetrics singleton. This is not necessary if the JVM itself
* is shutdown, but may be necessary for scenarios where JvmMetrics instance
* needs to be re-created while the JVM is still around. One such scenario
* is unit-testing.
*/
public static void shutdownSingleton() {
Singleton.INSTANCE.shutdown();
}
@Override
public void getMetrics(MetricsCollector collector, boolean all) {
MetricsRecordBuilder rb = collector.addRecord(JvmMetrics)
.setContext("jvm").tag(ProcessName, processName)
.tag(SessionId, sessionId);
getMemoryUsage(rb);
getGcUsage(rb);
if (threadMXBean != null) {
getThreadUsage(rb);
} else {
getThreadUsageFromGroup(rb);
}
}
private void getMemoryUsage(MetricsRecordBuilder rb) {
MemoryUsage memNonHeap = memoryMXBean.getNonHeapMemoryUsage();
MemoryUsage memHeap = memoryMXBean.getHeapMemoryUsage();
Runtime runtime = Runtime.getRuntime();
rb.addGauge(MemNonHeapUsedM, memNonHeap.getUsed() / M)
.addGauge(MemNonHeapCommittedM, memNonHeap.getCommitted() / M)
.addGauge(MemNonHeapMaxM, calculateMaxMemoryUsage(memNonHeap))
.addGauge(MemHeapUsedM, memHeap.getUsed() / M)
.addGauge(MemHeapCommittedM, memHeap.getCommitted() / M)
.addGauge(MemHeapMaxM, calculateMaxMemoryUsage(memHeap))
.addGauge(MemMaxM, runtime.maxMemory() / M);
}
private float calculateMaxMemoryUsage(MemoryUsage memHeap) {
long max = memHeap.getMax() ;
if (max == -1) {
return MEMORY_MAX_UNLIMITED_MB;
}
return max / M;
}
private void getGcUsage(MetricsRecordBuilder rb) {
long count = 0;
long timeMillis = 0;
for (GarbageCollectorMXBean gcBean : gcBeans) {
long c = gcBean.getCollectionCount();
long t = gcBean.getCollectionTime();
MetricsInfo[] gcInfo = getGcInfo(gcBean.getName());
rb.addCounter(gcInfo[0], c).addCounter(gcInfo[1], t);
count += c;
timeMillis += t;
}
rb.addCounter(GcCount, count)
.addCounter(GcTimeMillis, timeMillis);
if (pauseMonitor != null) {
rb.addCounter(GcNumWarnThresholdExceeded,
pauseMonitor.getNumGcWarnThresholdExceeded());
rb.addCounter(GcNumInfoThresholdExceeded,
pauseMonitor.getNumGcInfoThresholdExceeded());
rb.addCounter(GcTotalExtraSleepTime,
pauseMonitor.getTotalGcExtraSleepTime());
}
if (gcTimeMonitor != null) {
rb.addGauge(GcTimePercentage,
gcTimeMonitor.getLatestGcData().getGcTimePercentage());
}
}
private MetricsInfo[] getGcInfo(String gcName) {
MetricsInfo[] gcInfo = gcInfoCache.get(gcName);
if (gcInfo == null) {
gcInfo = new MetricsInfo[2];
gcInfo[0] = Interns.info("GcCount" + gcName, "GC Count for " + gcName);
gcInfo[1] = Interns
.info("GcTimeMillis" + gcName, "GC Time for " + gcName);
MetricsInfo[] previousGcInfo = gcInfoCache.putIfAbsent(gcName, gcInfo);
if (previousGcInfo != null) {
return previousGcInfo;
}
}
return gcInfo;
}
private void getThreadUsage(MetricsRecordBuilder rb) {
int threadsNew = 0;
int threadsRunnable = 0;
int threadsBlocked = 0;
int threadsWaiting = 0;
int threadsTimedWaiting = 0;
int threadsTerminated = 0;
long threadIds[] = threadMXBean.getAllThreadIds();
for (ThreadInfo threadInfo : threadMXBean.getThreadInfo(threadIds, 0)) {
if (threadInfo == null) continue; // race protection
switch (threadInfo.getThreadState()) {
case NEW: threadsNew++; break;
case RUNNABLE: threadsRunnable++; break;
case BLOCKED: threadsBlocked++; break;
case WAITING: threadsWaiting++; break;
case TIMED_WAITING: threadsTimedWaiting++; break;
case TERMINATED: threadsTerminated++; break;
}
}
rb.addGauge(ThreadsNew, threadsNew)
.addGauge(ThreadsRunnable, threadsRunnable)
.addGauge(ThreadsBlocked, threadsBlocked)
.addGauge(ThreadsWaiting, threadsWaiting)
.addGauge(ThreadsTimedWaiting, threadsTimedWaiting)
.addGauge(ThreadsTerminated, threadsTerminated);
}
private void getThreadUsageFromGroup(MetricsRecordBuilder rb) {
int threadsNew = 0;
int threadsRunnable = 0;
int threadsBlocked = 0;
int threadsWaiting = 0;
int threadsTimedWaiting = 0;
int threadsTerminated = 0;
ThreadGroup threadGroup = Thread.currentThread().getThreadGroup();
Thread[] threads = new Thread[threadGroup.activeCount()];
threadGroup.enumerate(threads);
for (Thread thread : threads) {
if (thread == null) {
// race protection
continue;
}
switch (thread.getState()) {
case NEW: threadsNew++; break;
case RUNNABLE: threadsRunnable++; break;
case BLOCKED: threadsBlocked++; break;
case WAITING: threadsWaiting++; break;
case TIMED_WAITING: threadsTimedWaiting++; break;
case TERMINATED: threadsTerminated++; break;
default:
}
}
rb.addGauge(ThreadsNew, threadsNew)
.addGauge(ThreadsRunnable, threadsRunnable)
.addGauge(ThreadsBlocked, threadsBlocked)
.addGauge(ThreadsWaiting, threadsWaiting)
.addGauge(ThreadsTimedWaiting, threadsTimedWaiting)
.addGauge(ThreadsTerminated, threadsTerminated);
}
}
相关信息
相关文章
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦