hadoop HumanReadableHistoryViewerPrinter 源码
haddop HumanReadableHistoryViewerPrinter 代码
文件路径:/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/HumanReadableHistoryViewerPrinter.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.jobhistory;
import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapred.JobStatus;
import org.apache.hadoop.mapred.TaskStatus;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.CounterGroup;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.util.StringUtils;
import java.io.IOException;
import java.io.PrintStream;
import java.text.DecimalFormat;
import java.text.Format;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.TimeZone;
/**
* Used by the {@link HistoryViewer} to print job history in a human-readable
* format.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
class HumanReadableHistoryViewerPrinter implements HistoryViewerPrinter {
private JobHistoryParser.JobInfo job;
private final FastDateFormat dateFormat;
private boolean printAll;
private String scheme;
HumanReadableHistoryViewerPrinter(JobHistoryParser.JobInfo job,
boolean printAll, String scheme) {
this(job, printAll, scheme, TimeZone.getDefault());
}
HumanReadableHistoryViewerPrinter(JobHistoryParser.JobInfo job,
boolean printAll, String scheme,
TimeZone tz) {
this.job = job;
this.printAll = printAll;
this.scheme = scheme;
this.dateFormat = FastDateFormat.getInstance("d-MMM-yyyy HH:mm:ss", tz);
}
/**
* Print out the Job History to the given {@link PrintStream} in a
* human-readable format.
* @param ps the {@link PrintStream} to print to
* @throws IOException when a problem occurs while printing
*/
@Override
public void print(PrintStream ps) throws IOException {
printJobDetails(ps);
printTaskSummary(ps);
printJobAnalysis(ps);
printTasks(ps, TaskType.JOB_SETUP, TaskStatus.State.FAILED.toString());
printTasks(ps, TaskType.JOB_SETUP, TaskStatus.State.KILLED.toString());
printTasks(ps, TaskType.MAP, TaskStatus.State.FAILED.toString());
printTasks(ps, TaskType.MAP, TaskStatus.State.KILLED.toString());
printTasks(ps, TaskType.REDUCE, TaskStatus.State.FAILED.toString());
printTasks(ps, TaskType.REDUCE, TaskStatus.State.KILLED.toString());
printTasks(ps, TaskType.JOB_CLEANUP, TaskStatus.State.FAILED.toString());
printTasks(ps, TaskType.JOB_CLEANUP,
JobStatus.getJobRunState(JobStatus.KILLED));
if (printAll) {
printTasks(ps, TaskType.JOB_SETUP, TaskStatus.State.SUCCEEDED.toString());
printTasks(ps, TaskType.MAP, TaskStatus.State.SUCCEEDED.toString());
printTasks(ps, TaskType.REDUCE, TaskStatus.State.SUCCEEDED.toString());
printTasks(ps, TaskType.JOB_CLEANUP,
TaskStatus.State.SUCCEEDED.toString());
printAllTaskAttempts(ps, TaskType.JOB_SETUP);
printAllTaskAttempts(ps, TaskType.MAP);
printAllTaskAttempts(ps, TaskType.REDUCE);
printAllTaskAttempts(ps, TaskType.JOB_CLEANUP);
}
HistoryViewer.FilteredJob filter = new HistoryViewer.FilteredJob(job,
TaskStatus.State.FAILED.toString());
printFailedAttempts(ps, filter);
filter = new HistoryViewer.FilteredJob(job,
TaskStatus.State.KILLED.toString());
printFailedAttempts(ps, filter);
}
private void printJobDetails(PrintStream ps) {
StringBuilder jobDetails = new StringBuilder();
jobDetails.append("\nHadoop job: ").append(job.getJobId());
jobDetails.append("\n=====================================");
jobDetails.append("\nUser: ").append(job.getUsername());
jobDetails.append("\nJobName: ").append(job.getJobname());
jobDetails.append("\nJobConf: ").append(job.getJobConfPath());
jobDetails.append("\nSubmitted At: ").append(StringUtils.
getFormattedTimeWithDiff(dateFormat,
job.getSubmitTime(), 0));
jobDetails.append("\nLaunched At: ").append(StringUtils.
getFormattedTimeWithDiff(dateFormat,
job.getLaunchTime(),
job.getSubmitTime()));
jobDetails.append("\nFinished At: ").append(StringUtils.
getFormattedTimeWithDiff(dateFormat,
job.getFinishTime(),
job.getLaunchTime()));
jobDetails.append("\nStatus: ").append(((job.getJobStatus() == null) ?
"Incomplete" :job.getJobStatus()));
printJobCounters(jobDetails, job.getTotalCounters(), job.getMapCounters(),
job.getReduceCounters());
jobDetails.append("\n");
jobDetails.append("\n=====================================");
ps.println(jobDetails);
}
private void printJobCounters(StringBuilder buff, Counters totalCounters,
Counters mapCounters, Counters reduceCounters) {
// Killed jobs might not have counters
if (totalCounters != null) {
buff.append("\nCounters: \n\n");
buff.append(String.format("|%1$-30s|%2$-30s|%3$-10s|%4$-10s|%5$-10s|",
"Group Name",
"Counter name",
"Map Value",
"Reduce Value",
"Total Value"));
buff.append("\n------------------------------------------" +
"---------------------------------------------");
for (CounterGroup counterGroup : totalCounters) {
String groupName = counterGroup.getName();
CounterGroup totalGroup = totalCounters.getGroup(groupName);
CounterGroup mapGroup = mapCounters.getGroup(groupName);
CounterGroup reduceGroup = reduceCounters.getGroup(groupName);
Format decimal = new DecimalFormat();
Iterator<Counter> ctrItr =
totalGroup.iterator();
while (ctrItr.hasNext()) {
org.apache.hadoop.mapreduce.Counter counter = ctrItr.next();
String name = counter.getName();
String mapValue =
decimal.format(mapGroup.findCounter(name).getValue());
String reduceValue =
decimal.format(reduceGroup.findCounter(name).getValue());
String totalValue =
decimal.format(counter.getValue());
buff.append(
String.format("%n|%1$-30s|%2$-30s|%3$-10s|%4$-10s|%5$-10s",
totalGroup.getDisplayName(),
counter.getDisplayName(),
mapValue, reduceValue, totalValue));
}
}
}
}
private void printAllTaskAttempts(PrintStream ps, TaskType taskType) {
Map<TaskID, JobHistoryParser.TaskInfo> tasks = job.getAllTasks();
StringBuilder taskList = new StringBuilder();
taskList.append("\n").append(taskType);
taskList.append(" task list for ").append(job.getJobId());
taskList.append("\nTaskId\t\tStartTime");
if (TaskType.REDUCE.equals(taskType)) {
taskList.append("\tShuffleFinished\tSortFinished");
}
taskList.append("\tFinishTime\tHostName\tError\tTaskLogs");
taskList.append("\n====================================================");
ps.println(taskList.toString());
for (JobHistoryParser.TaskInfo task : tasks.values()) {
for (JobHistoryParser.TaskAttemptInfo attempt :
task.getAllTaskAttempts().values()) {
if (taskType.equals(task.getTaskType())){
taskList.setLength(0);
taskList.append(attempt.getAttemptId()).append("\t");
taskList.append(StringUtils.getFormattedTimeWithDiff(dateFormat,
attempt.getStartTime(), 0)).append("\t");
if (TaskType.REDUCE.equals(taskType)) {
taskList.append(StringUtils.getFormattedTimeWithDiff(dateFormat,
attempt.getShuffleFinishTime(),
attempt.getStartTime()));
taskList.append("\t");
taskList.append(StringUtils.getFormattedTimeWithDiff(dateFormat,
attempt.getSortFinishTime(),
attempt.getShuffleFinishTime()));
}
taskList.append(StringUtils.getFormattedTimeWithDiff(dateFormat,
attempt.getFinishTime(),
attempt.getStartTime()));
taskList.append("\t");
taskList.append(attempt.getHostname()).append("\t");
taskList.append(attempt.getError());
String taskLogsUrl = HistoryViewer.getTaskLogsUrl(scheme, attempt);
taskList.append(taskLogsUrl != null ? taskLogsUrl : "n/a");
ps.println(taskList);
}
}
}
}
private void printTaskSummary(PrintStream ps) {
HistoryViewer.SummarizedJob ts = new HistoryViewer.SummarizedJob(job);
StringBuilder taskSummary = new StringBuilder();
taskSummary.append("\nTask Summary");
taskSummary.append("\n============================");
taskSummary.append("\nKind\tTotal\t");
taskSummary.append("Successful\tFailed\tKilled\tStartTime\tFinishTime");
taskSummary.append("\n");
taskSummary.append("\nSetup\t").append(ts.totalSetups);
taskSummary.append("\t").append(ts.numFinishedSetups);
taskSummary.append("\t\t").append(ts.numFailedSetups);
taskSummary.append("\t").append(ts.numKilledSetups);
taskSummary.append("\t").append(StringUtils.getFormattedTimeWithDiff(
dateFormat, ts.setupStarted, 0));
taskSummary.append("\t").append(StringUtils.getFormattedTimeWithDiff(
dateFormat, ts.setupFinished, ts.setupStarted));
taskSummary.append("\nMap\t").append(ts.totalMaps);
taskSummary.append("\t").append(job.getSucceededMaps());
taskSummary.append("\t\t").append(ts.numFailedMaps);
taskSummary.append("\t").append(ts.numKilledMaps);
taskSummary.append("\t").append(StringUtils.getFormattedTimeWithDiff(
dateFormat, ts.mapStarted, 0));
taskSummary.append("\t").append(StringUtils.getFormattedTimeWithDiff(
dateFormat, ts.mapFinished, ts.mapStarted));
taskSummary.append("\nReduce\t").append(ts.totalReduces);
taskSummary.append("\t").append(job.getSucceededReduces());
taskSummary.append("\t\t").append(ts.numFailedReduces);
taskSummary.append("\t").append(ts.numKilledReduces);
taskSummary.append("\t").append(StringUtils.getFormattedTimeWithDiff(
dateFormat, ts.reduceStarted, 0));
taskSummary.append("\t").append(StringUtils.getFormattedTimeWithDiff(
dateFormat, ts.reduceFinished, ts.reduceStarted));
taskSummary.append("\nCleanup\t").append(ts.totalCleanups);
taskSummary.append("\t").append(ts.numFinishedCleanups);
taskSummary.append("\t\t").append(ts.numFailedCleanups);
taskSummary.append("\t").append(ts.numKilledCleanups);
taskSummary.append("\t").append(StringUtils.getFormattedTimeWithDiff(
dateFormat, ts.cleanupStarted, 0));
taskSummary.append("\t").append(StringUtils.getFormattedTimeWithDiff(
dateFormat, ts.cleanupFinished,
ts.cleanupStarted));
taskSummary.append("\n============================\n");
ps.println(taskSummary);
}
private void printJobAnalysis(PrintStream ps) {
if (job.getJobStatus().equals(
JobStatus.getJobRunState(JobStatus.SUCCEEDED))) {
HistoryViewer.AnalyzedJob avg = new HistoryViewer.AnalyzedJob(job);
ps.println("\nAnalysis");
ps.println("=========");
printAnalysis(ps, avg.getMapTasks(), cMap, "map", avg.getAvgMapTime(),
10);
printLast(ps, avg.getMapTasks(), "map", cFinishMapRed);
if (avg.getReduceTasks().length > 0) {
printAnalysis(ps, avg.getReduceTasks(), cShuffle, "shuffle",
avg.getAvgShuffleTime(), 10);
printLast(ps, avg.getReduceTasks(), "shuffle", cFinishShuffle);
printAnalysis(ps, avg.getReduceTasks(), cReduce, "reduce",
avg.getAvgReduceTime(), 10);
printLast(ps, avg.getReduceTasks(), "reduce", cFinishMapRed);
}
ps.println("=========");
} else {
ps.println("No Analysis available as job did not finish");
}
}
protected void printAnalysis(PrintStream ps,
JobHistoryParser.TaskAttemptInfo[] tasks,
Comparator<JobHistoryParser.TaskAttemptInfo> cmp,
String taskType, long avg, int showTasks) {
Arrays.sort(tasks, cmp);
JobHistoryParser.TaskAttemptInfo min = tasks[tasks.length-1];
StringBuilder details = new StringBuilder();
details.append("\nTime taken by best performing ");
details.append(taskType).append(" task ");
details.append(min.getAttemptId().getTaskID().toString()).append(": ");
if ("map".equals(taskType)) {
details.append(StringUtils.formatTimeDiff(
min.getFinishTime(),
min.getStartTime()));
} else if ("shuffle".equals(taskType)) {
details.append(StringUtils.formatTimeDiff(
min.getShuffleFinishTime(),
min.getStartTime()));
} else {
details.append(StringUtils.formatTimeDiff(
min.getFinishTime(),
min.getShuffleFinishTime()));
}
details.append("\nAverage time taken by ");
details.append(taskType).append(" tasks: ");
details.append(StringUtils.formatTimeDiff(avg, 0));
details.append("\nWorse performing ");
details.append(taskType).append(" tasks: ");
details.append("\nTaskId\t\tTimetaken");
ps.println(details);
for (int i = 0; i < showTasks && i < tasks.length; i++) {
details.setLength(0);
details.append(tasks[i].getAttemptId().getTaskID()).append(" ");
if ("map".equals(taskType)) {
details.append(StringUtils.formatTimeDiff(
tasks[i].getFinishTime(),
tasks[i].getStartTime()));
} else if ("shuffle".equals(taskType)) {
details.append(StringUtils.formatTimeDiff(
tasks[i].getShuffleFinishTime(),
tasks[i].getStartTime()));
} else {
details.append(StringUtils.formatTimeDiff(
tasks[i].getFinishTime(),
tasks[i].getShuffleFinishTime()));
}
ps.println(details);
}
}
protected void printLast(PrintStream ps,
JobHistoryParser.TaskAttemptInfo[] tasks, String taskType,
Comparator<JobHistoryParser.TaskAttemptInfo> cmp) {
Arrays.sort(tasks, cFinishMapRed);
JobHistoryParser.TaskAttemptInfo last = tasks[0];
StringBuilder lastBuf = new StringBuilder();
lastBuf.append("The last ").append(taskType);
lastBuf.append(" task ").append(last.getAttemptId().getTaskID());
Long finishTime;
if ("shuffle".equals(taskType)) {
finishTime = last.getShuffleFinishTime();
} else {
finishTime = last.getFinishTime();
}
lastBuf.append(" finished at (relative to the Job launch time): ");
lastBuf.append(StringUtils.getFormattedTimeWithDiff(dateFormat,
finishTime, job.getLaunchTime()));
ps.println(lastBuf);
}
private void printTasks(PrintStream ps, TaskType taskType, String status) {
Map<TaskID, JobHistoryParser.TaskInfo> tasks = job.getAllTasks();
StringBuilder header = new StringBuilder();
header.append("\n").append(status).append(" ");
header.append(taskType).append(" task list for ")
.append(job.getJobId().toString());
header.append("\nTaskId\t\tStartTime\tFinishTime\tError");
if (TaskType.MAP.equals(taskType)) {
header.append("\tInputSplits");
}
header.append("\n====================================================");
StringBuilder taskList = new StringBuilder();
for (JobHistoryParser.TaskInfo task : tasks.values()) {
if (taskType.equals(task.getTaskType()) &&
(status.equals(task.getTaskStatus())
|| status.equalsIgnoreCase("ALL"))) {
taskList.setLength(0);
taskList.append(task.getTaskId());
taskList.append("\t").append(StringUtils.getFormattedTimeWithDiff(
dateFormat, task.getStartTime(), 0));
taskList.append("\t").append(StringUtils.getFormattedTimeWithDiff(
dateFormat, task.getFinishTime(),
task.getStartTime()));
taskList.append("\t").append(task.getError());
if (TaskType.MAP.equals(taskType)) {
taskList.append("\t").append(task.getSplitLocations());
}
if (taskList != null) {
ps.println(header);
ps.println(taskList);
}
}
}
}
private void printFailedAttempts(PrintStream ps,
HistoryViewer.FilteredJob filteredJob) {
Map<String, Set<TaskID>> badNodes = filteredJob.getFilteredMap();
StringBuilder attempts = new StringBuilder();
if (badNodes.size() > 0) {
attempts.append("\n").append(filteredJob.getFilter());
attempts.append(" task attempts by nodes");
attempts.append("\nHostname\tFailedTasks");
attempts.append("\n===============================");
ps.println(attempts);
for (Map.Entry<String, Set<TaskID>> entry : badNodes.entrySet()) {
String node = entry.getKey();
Set<TaskID> failedTasks = entry.getValue();
attempts.setLength(0);
attempts.append(node).append("\t");
for (TaskID t : failedTasks) {
attempts.append(t).append(", ");
}
ps.println(attempts);
}
}
}
private static Comparator<JobHistoryParser.TaskAttemptInfo> cMap =
new Comparator<JobHistoryParser.TaskAttemptInfo>() {
public int compare(JobHistoryParser.TaskAttemptInfo t1,
JobHistoryParser.TaskAttemptInfo t2) {
long l1 = t1.getFinishTime() - t1.getStartTime();
long l2 = t2.getFinishTime() - t2.getStartTime();
return Long.compare(l2, l1);
}
};
private static Comparator<JobHistoryParser.TaskAttemptInfo> cShuffle =
new Comparator<JobHistoryParser.TaskAttemptInfo>() {
public int compare(JobHistoryParser.TaskAttemptInfo t1,
JobHistoryParser.TaskAttemptInfo t2) {
long l1 = t1.getShuffleFinishTime() - t1.getStartTime();
long l2 = t2.getShuffleFinishTime() - t2.getStartTime();
return Long.compare(l2, l1);
}
};
private static Comparator<JobHistoryParser.TaskAttemptInfo> cFinishShuffle =
new Comparator<JobHistoryParser.TaskAttemptInfo>() {
public int compare(JobHistoryParser.TaskAttemptInfo t1,
JobHistoryParser.TaskAttemptInfo t2) {
long l1 = t1.getShuffleFinishTime();
long l2 = t2.getShuffleFinishTime();
return Long.compare(l2, l1);
}
};
private static Comparator<JobHistoryParser.TaskAttemptInfo> cFinishMapRed =
new Comparator<JobHistoryParser.TaskAttemptInfo>() {
public int compare(JobHistoryParser.TaskAttemptInfo t1,
JobHistoryParser.TaskAttemptInfo t2) {
long l1 = t1.getFinishTime();
long l2 = t2.getFinishTime();
return Long.compare(l2, l1);
}
};
private static Comparator<JobHistoryParser.TaskAttemptInfo> cReduce =
new Comparator<JobHistoryParser.TaskAttemptInfo>() {
public int compare(JobHistoryParser.TaskAttemptInfo t1,
JobHistoryParser.TaskAttemptInfo t2) {
long l1 = t1.getFinishTime() - t1.getShuffleFinishTime();
long l2 = t2.getFinishTime() - t2.getShuffleFinishTime();
return Long.compare(l2, l1);
}
};
}
相关信息
相关文章
hadoop HistoryViewerPrinter 源码
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦