hadoop Display 源码

  • 2022-10-20
  • 浏览 (487)

haddop Display 代码

文件路径:/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Display.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.fs.shell;

import java.io.ByteArrayOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.LinkedList;
import java.util.zip.GZIPInputStream;

import org.apache.avro.Schema;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.FileReader;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.io.JsonEncoder;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.AvroFSInput;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileChecksum;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathIsDirectoryException;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;

/**
 * Display contents or checksums of files 
 */
@InterfaceAudience.Private
@InterfaceStability.Evolving

class Display extends FsCommand {
  public static void registerCommands(CommandFactory factory) {
    factory.addClass(Cat.class, "-cat");
    factory.addClass(Text.class, "-text");
    factory.addClass(Checksum.class, "-checksum");
  }

  /**
   * Displays file content to stdout
   */
  public static class Cat extends Display {
    public static final String NAME = "cat";
    public static final String USAGE = "[-ignoreCrc] <src> ...";
    public static final String DESCRIPTION =
      "Fetch all files that match the file pattern <src> " +
      "and display their content on stdout.\n";

    private boolean verifyChecksum = true;

    @Override
    protected void processOptions(LinkedList<String> args)
    throws IOException {
      CommandFormat cf = new CommandFormat(1, Integer.MAX_VALUE, "ignoreCrc");
      cf.parse(args);
      verifyChecksum = !cf.getOpt("ignoreCrc");
    }

    @Override
    protected void processPath(PathData item) throws IOException {
      if (item.stat.isDirectory()) {
        throw new PathIsDirectoryException(item.toString());
      }
      
      item.fs.setVerifyChecksum(verifyChecksum);
      printToStdout(getInputStream(item));
    }

    private void printToStdout(InputStream in) throws IOException {
      try {
        IOUtils.copyBytes(in, out, getConf(), false);
      } finally {
        in.close();
      }
    }

    protected InputStream getInputStream(PathData item) throws IOException {
      // Always do sequential reads;
      return item.openForSequentialIO();
    }
  }
  
  /**
   * Same behavior as "-cat", but handles zip and TextRecordInputStream
   * and Avro encodings. 
   */ 
  public static class Text extends Cat {
    public static final String NAME = "text";
    public static final String USAGE = Cat.USAGE;
    public static final String DESCRIPTION =
      "Takes a source file and outputs the file in text format.\n" +
      "The allowed formats are zip and TextRecordInputStream and Avro.";
    
    @Override
    protected InputStream getInputStream(PathData item) throws IOException {
      FSDataInputStream i = (FSDataInputStream)super.getInputStream(item);

      // Handle 0 and 1-byte files
      short leadBytes;
      try {
        leadBytes = i.readShort();
      } catch (EOFException e) {
        i.seek(0);
        return i;
      }

      // Check type of stream first
      switch(leadBytes) {
        case 0x1f8b: { // RFC 1952
          // Must be gzip
          i.seek(0);
          return new GZIPInputStream(i);
        }
        case 0x5345: { // 'S' 'E'
          // Might be a SequenceFile
          if (i.readByte() == 'Q') {
            i.close();
            return new TextRecordInputStream(item.stat);
          }
        }
        default: {
          // Check the type of compression instead, depending on Codec class's
          // own detection methods, based on the provided path.
          CompressionCodecFactory cf = new CompressionCodecFactory(getConf());
          CompressionCodec codec = cf.getCodec(item.path);
          if (codec != null) {
            i.seek(0);
            return codec.createInputStream(i);
          }
          break;
        }
        case 0x4f62: { // 'O' 'b'
          if (i.readByte() == 'j') {
            i.close();
            return new AvroFileInputStream(item.stat);
          }
          break;
        }
      }

      // File is non-compressed, or not a file container we know.
      i.seek(0);
      return i;
    }
  }
  
  public static class Checksum extends Display {
    public static final String NAME = "checksum";
    public static final String USAGE = "[-v] <src> ...";
    public static final String DESCRIPTION =
      "Dump checksum information for files that match the file " +
      "pattern <src> to stdout. Note that this requires a round-trip " +
      "to a datanode storing each block of the file, and thus is not " +
      "efficient to run on a large number of files. The checksum of a " +
      "file depends on its content, block size and the checksum " +
      "algorithm and parameters used for creating the file.";

    private boolean displayBlockSize;

    @Override
    protected void processOptions(LinkedList<String> args)
        throws IOException {
      CommandFormat cf = new CommandFormat(1, Integer.MAX_VALUE, "v");
      cf.parse(args);
      displayBlockSize = cf.getOpt("v");
    }

    @Override
    protected void processPath(PathData item) throws IOException {
      if (item.stat.isDirectory()) {
        throw new PathIsDirectoryException(item.toString());
      }

      FileChecksum checksum = item.fs.getFileChecksum(item.path);
      String outputChecksum = checksum == null ? "NONE" :
          String.format("%s\t%s", checksum.getAlgorithmName(), StringUtils
              .byteToHexString(checksum.getBytes(), 0, checksum.getLength()));
      if (displayBlockSize) {
        FileStatus fileStatus = item.fs.getFileStatus(item.path);
        out.printf("%s\t%s\tBlockSize=%s%n", item.toString(), outputChecksum,
            fileStatus != null ? fileStatus.getBlockSize() : "NONE");
      } else {
        out.printf("%s\t%s%n", item.toString(), outputChecksum);
      }
    }
  }

  protected class TextRecordInputStream extends InputStream {
    SequenceFile.Reader r;
    Writable key;
    Writable val;

    DataInputBuffer inbuf;
    DataOutputBuffer outbuf;

    public TextRecordInputStream(FileStatus f) throws IOException {
      final Path fpath = f.getPath();
      final Configuration lconf = getConf();
      r = new SequenceFile.Reader(lconf, 
          SequenceFile.Reader.file(fpath));
      key = ReflectionUtils.newInstance(
          r.getKeyClass().asSubclass(Writable.class), lconf);
      val = ReflectionUtils.newInstance(
          r.getValueClass().asSubclass(Writable.class), lconf);
      inbuf = new DataInputBuffer();
      outbuf = new DataOutputBuffer();
    }

    @Override
    public int read() throws IOException {
      int ret;
      if (null == inbuf || -1 == (ret = inbuf.read())) {
        if (!r.next(key, val)) {
          return -1;
        }
        byte[] tmp = key.toString().getBytes(StandardCharsets.UTF_8);
        outbuf.write(tmp, 0, tmp.length);
        outbuf.write('\t');
        tmp = val.toString().getBytes(StandardCharsets.UTF_8);
        outbuf.write(tmp, 0, tmp.length);
        outbuf.write('\n');
        inbuf.reset(outbuf.getData(), outbuf.getLength());
        outbuf.reset();
        ret = inbuf.read();
      }
      return ret;
    }

    @Override
    public void close() throws IOException {
      r.close();
      super.close();
    }
  }

  /**
   * This class transforms a binary Avro data file into an InputStream
   * with data that is in a human readable JSON format.
   */
  protected static class AvroFileInputStream extends InputStream {
    private int pos;
    private byte[] buffer;
    private ByteArrayOutputStream output;
    private FileReader<?> fileReader;
    private DatumWriter<Object> writer;
    private JsonEncoder encoder;

    public AvroFileInputStream(FileStatus status) throws IOException {
      pos = 0;
      buffer = new byte[0];
      GenericDatumReader<Object> reader = new GenericDatumReader<Object>();
      FileContext fc = FileContext.getFileContext(new Configuration());
      fileReader =
        DataFileReader.openReader(new AvroFSInput(fc, status.getPath()),reader);
      Schema schema = fileReader.getSchema();
      writer = new GenericDatumWriter<Object>(schema);
      output = new ByteArrayOutputStream();
      encoder = EncoderFactory.get().jsonEncoder(schema, output);
    }

    /**
     * Read a single byte from the stream.
     */
    @Override
    public int read() throws IOException {
      if (pos < buffer.length) {
        return buffer[pos++];
      }
      if (!fileReader.hasNext()) {
        return -1;
      }
      writer.write(fileReader.next(), encoder);
      encoder.flush();
      if (!fileReader.hasNext()) {
        // Write a new line after the last Avro record.
        output.write(System.getProperty("line.separator")
                         .getBytes(StandardCharsets.UTF_8));
        output.flush();
      }
      pos = 0;
      buffer = output.toByteArray();
      output.reset();
      return read();
    }

    /**
      * Close the stream.
      */
    @Override
    public void close() throws IOException {
      fileReader.close();
      output.close();
      super.close();
    }
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop AclCommands 源码

hadoop Command 源码

hadoop CommandFactory 源码

hadoop CommandFormat 源码

hadoop CommandUtils 源码

hadoop CommandWithDestination 源码

hadoop Concat 源码

hadoop CopyCommandWithMultiThread 源码

hadoop CopyCommands 源码

hadoop Count 源码

0  赞