hadoop BlockOperations 源码
haddop BlockOperations 代码
文件路径:/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/BlockOperations.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.fs.impl.prefetch;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.DoubleSummaryStatistics;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.hadoop.fs.impl.prefetch.Validate.checkNotNegative;
/**
* Block level operations performed on a file.
* This class is meant to be used by {@code BlockManager}.
* It is separated out in its own file due to its size.
*
* This class is used for debugging/logging. Calls to this class
* can be safely removed without affecting the overall operation.
*/
public final class BlockOperations {
private static final Logger LOG = LoggerFactory.getLogger(BlockOperations.class);
/**
* Operation kind.
*/
public enum Kind {
UNKNOWN("??", "unknown", false),
CANCEL_PREFETCHES("CP", "cancelPrefetches", false),
CLOSE("CX", "close", false),
CACHE_PUT("C+", "putC", true),
GET_CACHED("GC", "getCached", true),
GET_PREFETCHED("GP", "getPrefetched", true),
GET_READ("GR", "getRead", true),
PREFETCH("PF", "prefetch", true),
RELEASE("RL", "release", true),
REQUEST_CACHING("RC", "requestCaching", true),
REQUEST_PREFETCH("RP", "requestPrefetch", true);
private String shortName;
private String name;
private boolean hasBlock;
Kind(String shortName, String name, boolean hasBlock) {
this.shortName = shortName;
this.name = name;
this.hasBlock = hasBlock;
}
private static Map<String, Kind> shortNameToKind = new HashMap<>();
public static Kind fromShortName(String shortName) {
if (shortNameToKind.isEmpty()) {
for (Kind kind : Kind.values()) {
shortNameToKind.put(kind.shortName, kind);
}
}
return shortNameToKind.get(shortName);
}
}
public static class Operation {
private final Kind kind;
private final int blockNumber;
private final long timestamp;
public Operation(Kind kind, int blockNumber) {
this.kind = kind;
this.blockNumber = blockNumber;
this.timestamp = System.nanoTime();
}
public Kind getKind() {
return kind;
}
public int getBlockNumber() {
return blockNumber;
}
public long getTimestamp() {
return timestamp;
}
public void getSummary(StringBuilder sb) {
if (kind.hasBlock) {
sb.append(String.format("%s(%d)", kind.shortName, blockNumber));
} else {
sb.append(String.format("%s", kind.shortName));
}
}
public String getDebugInfo() {
if (kind.hasBlock) {
return String.format("--- %s(%d)", kind.name, blockNumber);
} else {
return String.format("... %s()", kind.name);
}
}
}
public static class End extends Operation {
private Operation op;
public End(Operation op) {
super(op.kind, op.blockNumber);
this.op = op;
}
@Override
public void getSummary(StringBuilder sb) {
sb.append("E");
super.getSummary(sb);
}
@Override
public String getDebugInfo() {
return "***" + super.getDebugInfo().substring(3);
}
public double duration() {
return (getTimestamp() - op.getTimestamp()) / 1e9;
}
}
private ArrayList<Operation> ops;
private boolean debugMode;
public BlockOperations() {
this.ops = new ArrayList<>();
}
public synchronized void setDebug(boolean state) {
debugMode = state;
}
private synchronized Operation add(Operation op) {
if (debugMode) {
LOG.info(op.getDebugInfo());
}
ops.add(op);
return op;
}
public Operation getPrefetched(int blockNumber) {
checkNotNegative(blockNumber, "blockNumber");
return add(new Operation(Kind.GET_PREFETCHED, blockNumber));
}
public Operation getCached(int blockNumber) {
checkNotNegative(blockNumber, "blockNumber");
return add(new Operation(Kind.GET_CACHED, blockNumber));
}
public Operation getRead(int blockNumber) {
checkNotNegative(blockNumber, "blockNumber");
return add(new Operation(Kind.GET_READ, blockNumber));
}
public Operation release(int blockNumber) {
checkNotNegative(blockNumber, "blockNumber");
return add(new Operation(Kind.RELEASE, blockNumber));
}
public Operation requestPrefetch(int blockNumber) {
checkNotNegative(blockNumber, "blockNumber");
return add(new Operation(Kind.REQUEST_PREFETCH, blockNumber));
}
public Operation prefetch(int blockNumber) {
checkNotNegative(blockNumber, "blockNumber");
return add(new Operation(Kind.PREFETCH, blockNumber));
}
public Operation cancelPrefetches() {
return add(new Operation(Kind.CANCEL_PREFETCHES, -1));
}
public Operation close() {
return add(new Operation(Kind.CLOSE, -1));
}
public Operation requestCaching(int blockNumber) {
checkNotNegative(blockNumber, "blockNumber");
return add(new Operation(Kind.REQUEST_CACHING, blockNumber));
}
public Operation addToCache(int blockNumber) {
checkNotNegative(blockNumber, "blockNumber");
return add(new Operation(Kind.CACHE_PUT, blockNumber));
}
public Operation end(Operation op) {
return add(new End(op));
}
private static void append(StringBuilder sb, String format, Object... args) {
sb.append(String.format(format, args));
}
public synchronized String getSummary(boolean showDebugInfo) {
StringBuilder sb = new StringBuilder();
for (Operation op : ops) {
if (op != null) {
if (showDebugInfo) {
sb.append(op.getDebugInfo());
sb.append("\n");
} else {
op.getSummary(sb);
sb.append(";");
}
}
}
sb.append("\n");
getDurationInfo(sb);
return sb.toString();
}
public synchronized void getDurationInfo(StringBuilder sb) {
Map<Kind, DoubleSummaryStatistics> durations = new HashMap<>();
for (Operation op : ops) {
if (op instanceof End) {
End endOp = (End) op;
DoubleSummaryStatistics stats = durations.get(endOp.getKind());
if (stats == null) {
stats = new DoubleSummaryStatistics();
durations.put(endOp.getKind(), stats);
}
stats.accept(endOp.duration());
}
}
List<Kind> kinds = Arrays.asList(
Kind.GET_CACHED,
Kind.GET_PREFETCHED,
Kind.GET_READ,
Kind.CACHE_PUT,
Kind.PREFETCH,
Kind.REQUEST_CACHING,
Kind.REQUEST_PREFETCH,
Kind.CANCEL_PREFETCHES,
Kind.RELEASE,
Kind.CLOSE
);
for (Kind kind : kinds) {
append(sb, "%-18s : ", kind);
DoubleSummaryStatistics stats = durations.get(kind);
if (stats == null) {
append(sb, "--\n");
} else {
append(
sb,
"#ops = %3d, total = %5.1f, min: %3.1f, avg: %3.1f, max: %3.1f\n",
stats.getCount(),
stats.getSum(),
stats.getMin(),
stats.getAverage(),
stats.getMax());
}
}
}
public synchronized void analyze(StringBuilder sb) {
Map<Integer, List<Operation>> blockOps = new HashMap<>();
// Group-by block number.
for (Operation op : ops) {
if (op.blockNumber < 0) {
continue;
}
List<Operation> perBlockOps;
if (!blockOps.containsKey(op.blockNumber)) {
perBlockOps = new ArrayList<>();
blockOps.put(op.blockNumber, perBlockOps);
}
perBlockOps = blockOps.get(op.blockNumber);
perBlockOps.add(op);
}
List<Integer> prefetchedNotUsed = new ArrayList<>();
List<Integer> cachedNotUsed = new ArrayList<>();
for (Map.Entry<Integer, List<Operation>> entry : blockOps.entrySet()) {
Integer blockNumber = entry.getKey();
List<Operation> perBlockOps = entry.getValue();
Map<Kind, Integer> kindCounts = new HashMap<>();
Map<Kind, Integer> endKindCounts = new HashMap<>();
for (Operation op : perBlockOps) {
if (op instanceof End) {
int endCount = endKindCounts.getOrDefault(op.kind, 0) + 1;
endKindCounts.put(op.kind, endCount);
} else {
int count = kindCounts.getOrDefault(op.kind, 0) + 1;
kindCounts.put(op.kind, count);
}
}
for (Kind kind : kindCounts.keySet()) {
int count = kindCounts.getOrDefault(kind, 0);
int endCount = endKindCounts.getOrDefault(kind, 0);
if (count != endCount) {
append(sb, "[%d] %s : #ops(%d) != #end-ops(%d)\n", blockNumber, kind, count, endCount);
}
if (count > 1) {
append(sb, "[%d] %s = %d\n", blockNumber, kind, count);
}
}
int prefetchCount = kindCounts.getOrDefault(Kind.PREFETCH, 0);
int getPrefetchedCount = kindCounts.getOrDefault(Kind.GET_PREFETCHED, 0);
if ((prefetchCount > 0) && (getPrefetchedCount < prefetchCount)) {
prefetchedNotUsed.add(blockNumber);
}
int cacheCount = kindCounts.getOrDefault(Kind.CACHE_PUT, 0);
int getCachedCount = kindCounts.getOrDefault(Kind.GET_CACHED, 0);
if ((cacheCount > 0) && (getCachedCount < cacheCount)) {
cachedNotUsed.add(blockNumber);
}
}
if (!prefetchedNotUsed.isEmpty()) {
append(sb, "Prefetched but not used: %s\n", getIntList(prefetchedNotUsed));
}
if (!cachedNotUsed.isEmpty()) {
append(sb, "Cached but not used: %s\n", getIntList(cachedNotUsed));
}
}
private static String getIntList(Iterable<Integer> nums) {
List<String> numList = new ArrayList<>();
for (Integer n : nums) {
numList.add(n.toString());
}
return String.join(", ", numList);
}
public static BlockOperations fromSummary(String summary) {
BlockOperations ops = new BlockOperations();
ops.setDebug(true);
Pattern blockOpPattern = Pattern.compile("([A-Z+]+)(\\(([0-9]+)?\\))?");
String[] tokens = summary.split(";");
for (String token : tokens) {
Matcher matcher = blockOpPattern.matcher(token);
if (!matcher.matches()) {
String message = String.format("Unknown summary format: %s", token);
throw new IllegalArgumentException(message);
}
String shortName = matcher.group(1);
String blockNumberStr = matcher.group(3);
int blockNumber = (blockNumberStr == null) ? -1 : Integer.parseInt(blockNumberStr);
Kind kind = Kind.fromShortName(shortName);
Kind endKind = null;
if (kind == null) {
if (shortName.charAt(0) == 'E') {
endKind = Kind.fromShortName(shortName.substring(1));
}
}
if (kind == null && endKind == null) {
String message = String.format("Unknown short name: %s (token = %s)", shortName, token);
throw new IllegalArgumentException(message);
}
if (kind != null) {
ops.add(new Operation(kind, blockNumber));
} else {
Operation op = null;
for (int i = ops.ops.size() - 1; i >= 0; i--) {
op = ops.ops.get(i);
if ((op.blockNumber == blockNumber) && (op.kind == endKind) && !(op instanceof End)) {
ops.add(new End(op));
break;
}
}
if (op == null) {
LOG.warn("Start op not found: {}({})", endKind, blockNumber);
}
}
}
return ops;
}
}
相关信息
相关文章
hadoop EmptyPrefetchingStatistics 源码
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦