hadoop ChecksumFs 源码
haddop ChecksumFs 代码
文件路径:/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFs.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.NoSuchElementException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.Options.ChecksumOpt;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.Progressable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Abstract Checksumed Fs.
* It provide a basic implementation of a Checksumed Fs,
* which creates a checksum file for each raw file.
* It generates & verifies checksums at the client side.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving /*Evolving for a release,to be changed to Stable */
public abstract class ChecksumFs extends FilterFs {
private static final byte[] CHECKSUM_VERSION = new byte[] {'c', 'r', 'c', 0};
private int defaultBytesPerChecksum = 512;
private boolean verifyChecksum = true;
public static double getApproxChkSumLength(long size) {
return ChecksumFSOutputSummer.CHKSUM_AS_FRACTION * size;
}
public ChecksumFs(AbstractFileSystem theFs)
throws IOException, URISyntaxException {
super(theFs);
defaultBytesPerChecksum =
getMyFs().getServerDefaults(new Path("/")).getBytesPerChecksum();
}
/**
* Set whether to verify checksum.
*/
@Override
public void setVerifyChecksum(boolean inVerifyChecksum) {
this.verifyChecksum = inVerifyChecksum;
}
/**
* get the raw file system.
*
* @return abstract file system.
*/
public AbstractFileSystem getRawFs() {
return getMyFs();
}
/**
* Return the name of the checksum file associated with a file.
*
* @param file the file path.
* @return the checksum file associated with a file.
*/
public Path getChecksumFile(Path file) {
return new Path(file.getParent(), "." + file.getName() + ".crc");
}
/**
* Return true iff file is a checksum file name.
*
* @param file the file path.
* @return if is checksum file true,not false.
*/
public static boolean isChecksumFile(Path file) {
String name = file.getName();
return name.startsWith(".") && name.endsWith(".crc");
}
/**
* Return the length of the checksum file given the size of the
* actual file.
*
* @param file the file path.
* @param fileSize file size.
* @return check sum file length.
*/
public long getChecksumFileLength(Path file, long fileSize) {
return getChecksumLength(fileSize, getBytesPerSum());
}
/**
* Return the bytes Per Checksum.
*
* @return bytes per sum.
*/
public int getBytesPerSum() {
return defaultBytesPerChecksum;
}
private int getSumBufferSize(int bytesPerSum, int bufferSize, Path file)
throws IOException {
int defaultBufferSize = getMyFs().getServerDefaults(file)
.getFileBufferSize();
int proportionalBufferSize = bufferSize / bytesPerSum;
return Math.max(bytesPerSum,
Math.max(proportionalBufferSize, defaultBufferSize));
}
/*******************************************************
* For open()'s FSInputStream
* It verifies that data matches checksums.
*******************************************************/
private static class ChecksumFSInputChecker extends FSInputChecker {
public static final Logger LOG =
LoggerFactory.getLogger(FSInputChecker.class);
private static final int HEADER_LENGTH = 8;
private ChecksumFs fs;
private FSDataInputStream datas;
private FSDataInputStream sums;
private int bytesPerSum = 1;
private long fileLen = -1L;
public ChecksumFSInputChecker(ChecksumFs fs, Path file)
throws IOException, UnresolvedLinkException {
this(fs, file, fs.getServerDefaults(file).getFileBufferSize());
}
public ChecksumFSInputChecker(ChecksumFs fs, Path file, int bufferSize)
throws IOException, UnresolvedLinkException {
super(file, fs.getFileStatus(file).getReplication());
this.datas = fs.getRawFs().open(file, bufferSize);
this.fs = fs;
Path sumFile = fs.getChecksumFile(file);
try {
int sumBufferSize = fs.getSumBufferSize(fs.getBytesPerSum(),
bufferSize, file);
sums = fs.getRawFs().open(sumFile, sumBufferSize);
byte[] version = new byte[CHECKSUM_VERSION.length];
sums.readFully(version);
if (!Arrays.equals(version, CHECKSUM_VERSION)) {
throw new IOException("Not a checksum file: "+sumFile);
}
this.bytesPerSum = sums.readInt();
set(fs.verifyChecksum, DataChecksum.newCrc32(), bytesPerSum, 4);
} catch (FileNotFoundException e) { // quietly ignore
set(fs.verifyChecksum, null, 1, 0);
} catch (IOException e) { // loudly ignore
LOG.warn("Problem opening checksum file: "+ file +
". Ignoring exception: " , e);
set(fs.verifyChecksum, null, 1, 0);
}
}
private long getChecksumFilePos(long dataPos) {
return HEADER_LENGTH + 4*(dataPos/bytesPerSum);
}
@Override
protected long getChunkPosition(long dataPos) {
return dataPos/bytesPerSum*bytesPerSum;
}
@Override
public int available() throws IOException {
return datas.available() + super.available();
}
@Override
public int read(long position, byte[] b, int off, int len)
throws IOException, UnresolvedLinkException {
// parameter check
validatePositionedReadArgs(position, b, off, len);
if (len == 0) {
return 0;
}
int nread;
try (ChecksumFSInputChecker checker =
new ChecksumFSInputChecker(fs, file)) {
checker.seek(position);
nread = checker.read(b, off, len);
}
return nread;
}
@Override
public void close() throws IOException {
datas.close();
if (sums != null) {
sums.close();
}
set(fs.verifyChecksum, null, 1, 0);
}
@Override
public boolean seekToNewSource(long targetPos) throws IOException {
final long sumsPos = getChecksumFilePos(targetPos);
fs.reportChecksumFailure(file, datas, targetPos, sums, sumsPos);
final boolean newDataSource = datas.seekToNewSource(targetPos);
return sums.seekToNewSource(sumsPos) || newDataSource;
}
@Override
protected int readChunk(long pos, byte[] buf, int offset, int len,
byte[] checksum) throws IOException {
boolean eof = false;
if (needChecksum()) {
assert checksum != null; // we have a checksum buffer
assert checksum.length % CHECKSUM_SIZE == 0; // it is sane length
assert len >= bytesPerSum; // we must read at least one chunk
final int checksumsToRead = Math.min(
len/bytesPerSum, // number of checksums based on len to read
checksum.length / CHECKSUM_SIZE); // size of checksum buffer
long checksumPos = getChecksumFilePos(pos);
if(checksumPos != sums.getPos()) {
sums.seek(checksumPos);
}
int sumLenRead = sums.read(checksum, 0, CHECKSUM_SIZE * checksumsToRead);
if (sumLenRead >= 0 && sumLenRead % CHECKSUM_SIZE != 0) {
throw new EOFException("Checksum file not a length multiple of checksum size " +
"in " + file + " at " + pos + " checksumpos: " + checksumPos +
" sumLenread: " + sumLenRead );
}
if (sumLenRead <= 0) { // we're at the end of the file
eof = true;
} else {
// Adjust amount of data to read based on how many checksum chunks we read
len = Math.min(len, bytesPerSum * (sumLenRead / CHECKSUM_SIZE));
}
}
if (pos != datas.getPos()) {
datas.seek(pos);
}
int nread = readFully(datas, buf, offset, len);
if (eof && nread > 0) {
throw new ChecksumException("Checksum error: "+file+" at "+pos, pos);
}
return nread;
}
/* Return the file length */
private long getFileLength() throws IOException, UnresolvedLinkException {
if (fileLen==-1L) {
fileLen = fs.getFileStatus(file).getLen();
}
return fileLen;
}
/**
* Skips over and discards <code>n</code> bytes of data from the
* input stream.
*
* The <code>skip</code> method skips over some smaller number of bytes
* when reaching end of file before <code>n</code> bytes have been skipped.
* The actual number of bytes skipped is returned. If <code>n</code> is
* negative, no bytes are skipped.
*
* @param n the number of bytes to be skipped.
* @return the actual number of bytes skipped.
* @exception IOException if an I/O error occurs.
* ChecksumException if the chunk to skip to is corrupted
*/
@Override
public synchronized long skip(long n) throws IOException {
final long curPos = getPos();
final long fileLength = getFileLength();
if (n+curPos > fileLength) {
n = fileLength - curPos;
}
return super.skip(n);
}
/**
* Seek to the given position in the stream.
* The next read() will be from that position.
*
* <p>This method does not allow seek past the end of the file.
* This produces IOException.
*
* @param pos the postion to seek to.
* @exception IOException if an I/O error occurs or seeks after EOF
* ChecksumException if the chunk to seek to is corrupted
*/
@Override
public synchronized void seek(long pos) throws IOException {
if (pos>getFileLength()) {
throw new IOException("Cannot seek after EOF");
}
super.seek(pos);
}
}
@Override
public boolean truncate(Path f, long newLength) throws IOException {
throw new UnsupportedOperationException("Truncate is not supported "
+ "by ChecksumFs");
}
/**
* Opens an FSDataInputStream at the indicated Path.
* @param f the file name to open
* @param bufferSize the size of the buffer to be used.
*/
@Override
public FSDataInputStream open(Path f, int bufferSize)
throws IOException, UnresolvedLinkException {
return new FSDataInputStream(
new ChecksumFSInputChecker(this, f, bufferSize));
}
/**
* Calculated the length of the checksum file in bytes.
* @param size the length of the data file in bytes
* @param bytesPerSum the number of bytes in a checksum block
* @return the number of bytes in the checksum file
*/
public static long getChecksumLength(long size, int bytesPerSum) {
//the checksum length is equal to size passed divided by bytesPerSum +
//bytes written in the beginning of the checksum file.
return ((size + bytesPerSum - 1) / bytesPerSum) * 4 +
CHECKSUM_VERSION.length + 4;
}
/** This class provides an output stream for a checksummed file.
* It generates checksums for data. */
private static class ChecksumFSOutputSummer extends FSOutputSummer {
private FSDataOutputStream datas;
private FSDataOutputStream sums;
private static final float CHKSUM_AS_FRACTION = 0.01f;
private boolean isClosed = false;
public ChecksumFSOutputSummer(final ChecksumFs fs, final Path file,
final EnumSet<CreateFlag> createFlag,
final FsPermission absolutePermission, final int bufferSize,
final short replication, final long blockSize,
final Progressable progress, final ChecksumOpt checksumOpt,
final boolean createParent) throws IOException {
super(DataChecksum.newDataChecksum(DataChecksum.Type.CRC32,
fs.getBytesPerSum()));
// checksumOpt is passed down to the raw fs. Unless it implements
// checksum impelemts internally, checksumOpt will be ignored.
// If the raw fs does checksum internally, we will end up with
// two layers of checksumming. i.e. checksumming checksum file.
this.datas = fs.getRawFs().createInternal(file, createFlag,
absolutePermission, bufferSize, replication, blockSize, progress,
checksumOpt, createParent);
// Now create the chekcsumfile; adjust the buffsize
int bytesPerSum = fs.getBytesPerSum();
int sumBufferSize = fs.getSumBufferSize(bytesPerSum, bufferSize, file);
this.sums = fs.getRawFs().createInternal(fs.getChecksumFile(file),
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE),
absolutePermission, sumBufferSize, replication, blockSize, progress,
checksumOpt, createParent);
sums.write(CHECKSUM_VERSION, 0, CHECKSUM_VERSION.length);
sums.writeInt(bytesPerSum);
}
@Override
public void close() throws IOException {
try {
flushBuffer();
sums.close();
datas.close();
} finally {
isClosed = true;
}
}
@Override
protected void writeChunk(byte[] b, int offset, int len, byte[] checksum,
int ckoff, int cklen)
throws IOException {
datas.write(b, offset, len);
sums.write(checksum, ckoff, cklen);
}
@Override
protected void checkClosed() throws IOException {
if (isClosed) {
throw new ClosedChannelException();
}
}
}
@Override
public FSDataOutputStream createInternal(Path f,
EnumSet<CreateFlag> createFlag, FsPermission absolutePermission,
int bufferSize, short replication, long blockSize, Progressable progress,
ChecksumOpt checksumOpt, boolean createParent) throws IOException {
final FSDataOutputStream out = new FSDataOutputStream(
new ChecksumFSOutputSummer(this, f, createFlag, absolutePermission,
bufferSize, replication, blockSize, progress,
checksumOpt, createParent), null);
return out;
}
/** Check if exists.
* @param f source file
*/
private boolean exists(Path f)
throws IOException, UnresolvedLinkException {
try {
return getMyFs().getFileStatus(f) != null;
} catch (FileNotFoundException e) {
return false;
}
}
/** True iff the named path is a directory.
* Note: Avoid using this method. Instead reuse the FileStatus
* returned by getFileStatus() or listStatus() methods.
*/
private boolean isDirectory(Path f)
throws IOException, UnresolvedLinkException {
try {
return getMyFs().getFileStatus(f).isDirectory();
} catch (FileNotFoundException e) {
return false; // f does not exist
}
}
/**
* Set replication for an existing file.
* Implement the abstract <tt>setReplication</tt> of <tt>FileSystem</tt>
* @param src file name
* @param replication new replication
* @throws IOException if an I/O error occurs.
* @return true if successful;
* false if file does not exist or is a directory
*/
@Override
public boolean setReplication(Path src, short replication)
throws IOException, UnresolvedLinkException {
boolean value = getMyFs().setReplication(src, replication);
if (!value) {
return false;
}
Path checkFile = getChecksumFile(src);
if (exists(checkFile)) {
getMyFs().setReplication(checkFile, replication);
}
return true;
}
/**
* Rename files/dirs.
*/
@Override
public void renameInternal(Path src, Path dst)
throws IOException, UnresolvedLinkException {
if (isDirectory(src)) {
getMyFs().rename(src, dst);
} else {
getMyFs().rename(src, dst);
Path checkFile = getChecksumFile(src);
if (exists(checkFile)) { //try to rename checksum
if (isDirectory(dst)) {
getMyFs().rename(checkFile, dst);
} else {
getMyFs().rename(checkFile, getChecksumFile(dst));
}
}
}
}
@Override
public void renameInternal(Path src, Path dst, boolean overwrite)
throws AccessControlException, FileAlreadyExistsException,
FileNotFoundException, ParentNotDirectoryException,
UnresolvedLinkException, IOException {
Options.Rename renameOpt = Options.Rename.NONE;
if (overwrite) {
renameOpt = Options.Rename.OVERWRITE;
}
if (isDirectory(src)) {
getMyFs().rename(src, dst, renameOpt);
} else {
getMyFs().rename(src, dst, renameOpt);
Path checkFile = getChecksumFile(src);
if (exists(checkFile)) { //try to rename checksum
if (isDirectory(dst)) {
getMyFs().rename(checkFile, dst, renameOpt);
} else {
getMyFs().rename(checkFile, getChecksumFile(dst), renameOpt);
}
}
}
}
/**
* Implement the delete(Path, boolean) in checksum
* file system.
*/
@Override
public boolean delete(Path f, boolean recursive)
throws IOException, UnresolvedLinkException {
FileStatus fstatus = null;
try {
fstatus = getMyFs().getFileStatus(f);
} catch(FileNotFoundException e) {
return false;
}
if (fstatus.isDirectory()) {
//this works since the crcs are in the same
//directories and the files. so we just delete
//everything in the underlying filesystem
return getMyFs().delete(f, recursive);
} else {
Path checkFile = getChecksumFile(f);
if (exists(checkFile)) {
getMyFs().delete(checkFile, true);
}
return getMyFs().delete(f, true);
}
}
/**
* Report a checksum error to the file system.
* @param f the file name containing the error
* @param in the stream open on the file
* @param inPos the position of the beginning of the bad data in the file
* @param sums the stream open on the checksum file
* @param sumsPos the position of the beginning of the bad data in the
* checksum file
* @return if retry is necessary
*/
public boolean reportChecksumFailure(Path f, FSDataInputStream in,
long inPos, FSDataInputStream sums, long sumsPos) {
return false;
}
@Override
public FileStatus[] listStatus(Path f) throws IOException,
UnresolvedLinkException {
ArrayList<FileStatus> results = new ArrayList<FileStatus>();
FileStatus[] listing = getMyFs().listStatus(f);
if (listing != null) {
for (int i = 0; i < listing.length; i++) {
if (!isChecksumFile(listing[i].getPath())) {
results.add(listing[i]);
}
}
}
return results.toArray(new FileStatus[results.size()]);
}
@Override
public RemoteIterator<LocatedFileStatus> listLocatedStatus(final Path f)
throws AccessControlException, FileNotFoundException,
UnresolvedLinkException, IOException {
final RemoteIterator<LocatedFileStatus> iter =
getMyFs().listLocatedStatus(f);
return new RemoteIterator<LocatedFileStatus>() {
private LocatedFileStatus next = null;
@Override
public boolean hasNext() throws IOException {
while (next == null && iter.hasNext()) {
LocatedFileStatus unfilteredNext = iter.next();
if (!isChecksumFile(unfilteredNext.getPath())) {
next = unfilteredNext;
}
}
return next != null;
}
@Override
public LocatedFileStatus next() throws IOException {
if (!hasNext()) {
throw new NoSuchElementException();
}
LocatedFileStatus tmp = next;
next = null;
return tmp;
}
};
}
}
相关信息
相关文章
hadoop BatchListingOperations 源码
hadoop BatchedRemoteIterator 源码
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦