hadoop MultipleOutputFormat 源码

  • 2022-10-20
  • 浏览 (154)

haddop MultipleOutputFormat 代码

文件路径:/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/MultipleOutputFormat.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.mapred.lib;

import java.io.IOException;
import java.util.Iterator;
import java.util.TreeMap;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.util.Progressable;

/**
 * This abstract class extends the FileOutputFormat, allowing to write the
 * output data to different output files. There are three basic use cases for
 * this class.
 * 
 * Case one: This class is used for a map reduce job with at least one reducer.
 * The reducer wants to write data to different files depending on the actual
 * keys. It is assumed that a key (or value) encodes the actual key (value)
 * and the desired location for the actual key (value).
 * 
 * Case two: This class is used for a map only job. The job wants to use an
 * output file name that is either a part of the input file name of the input
 * data, or some derivation of it.
 * 
 * Case three: This class is used for a map only job. The job wants to use an
 * output file name that depends on both the keys and the input file name,
 */
@InterfaceAudience.Public
@InterfaceStability.Stable
public abstract class MultipleOutputFormat<K, V>
extends FileOutputFormat<K, V> {

  /**
   * Create a composite record writer that can write key/value data to different
   * output files
   * 
   * @param fs
   *          the file system to use
   * @param job
   *          the job conf for the job
   * @param name
   *          the leaf file name for the output file (such as part-00000")
   * @param arg3
   *          a progressable for reporting progress.
   * @return a composite record writer
   * @throws IOException
   */
  public RecordWriter<K, V> getRecordWriter(FileSystem fs, JobConf job,
      String name, Progressable arg3) throws IOException {

    final FileSystem myFS = fs;
    final String myName = generateLeafFileName(name);
    final JobConf myJob = job;
    final Progressable myProgressable = arg3;

    return new RecordWriter<K, V>() {

      // a cache storing the record writers for different output files.
      TreeMap<String, RecordWriter<K, V>> recordWriters = new TreeMap<String, RecordWriter<K, V>>();

      public void write(K key, V value) throws IOException {

        // get the file name based on the key
        String keyBasedPath = generateFileNameForKeyValue(key, value, myName);

        // get the file name based on the input file name
        String finalPath = getInputFileBasedOutputFileName(myJob, keyBasedPath);

        // get the actual key
        K actualKey = generateActualKey(key, value);
        V actualValue = generateActualValue(key, value);

        RecordWriter<K, V> rw = this.recordWriters.get(finalPath);
        if (rw == null) {
          // if we don't have the record writer yet for the final path, create
          // one
          // and add it to the cache
          rw = getBaseRecordWriter(myFS, myJob, finalPath, myProgressable);
          this.recordWriters.put(finalPath, rw);
        }
        rw.write(actualKey, actualValue);
      };

      public void close(Reporter reporter) throws IOException {
        Iterator<String> keys = this.recordWriters.keySet().iterator();
        while (keys.hasNext()) {
          RecordWriter<K, V> rw = this.recordWriters.get(keys.next());
          rw.close(reporter);
        }
        this.recordWriters.clear();
      };
    };
  }

  /**
   * Generate the leaf name for the output file name. The default behavior does
   * not change the leaf file name (such as part-00000)
   * 
   * @param name
   *          the leaf file name for the output file
   * @return the given leaf file name
   */
  protected String generateLeafFileName(String name) {
    return name;
  }

  /**
   * Generate the file output file name based on the given key and the leaf file
   * name. The default behavior is that the file name does not depend on the
   * key.
   * 
   * @param key
   *          the key of the output data
   * @param name
   *          the leaf file name
   * @return generated file name
   */
  protected String generateFileNameForKeyValue(K key, V value, String name) {
    return name;
  }

  /**
   * Generate the actual key from the given key/value. The default behavior is that
   * the actual key is equal to the given key
   * 
   * @param key
   *          the key of the output data
   * @param value
   *          the value of the output data
   * @return the actual key derived from the given key/value
   */
  protected K generateActualKey(K key, V value) {
    return key;
  }
  
  /**
   * Generate the actual value from the given key and value. The default behavior is that
   * the actual value is equal to the given value
   * 
   * @param key
   *          the key of the output data
   * @param value
   *          the value of the output data
   * @return the actual value derived from the given key/value
   */
  protected V generateActualValue(K key, V value) {
    return value;
  }
  

  /**
   * Generate the outfile name based on a given name and the input file name. If
   * the {@link JobContext#MAP_INPUT_FILE} does not exists (i.e. this is not for a map only job),
   * the given name is returned unchanged. If the config value for
   * "num.of.trailing.legs.to.use" is not set, or set 0 or negative, the given
   * name is returned unchanged. Otherwise, return a file name consisting of the
   * N trailing legs of the input file name where N is the config value for
   * "num.of.trailing.legs.to.use".
   * 
   * @param job
   *          the job config
   * @param name
   *          the output file name
   * @return the outfile name based on a given name and the input file name.
   */
  protected String getInputFileBasedOutputFileName(JobConf job, String name) {
    String infilepath = job.get(MRJobConfig.MAP_INPUT_FILE);
    if (infilepath == null) {
      // if the {@link JobContext#MAP_INPUT_FILE} does not exists,
      // then return the given name
      return name;
    }
    int numOfTrailingLegsToUse = job.getInt("mapred.outputformat.numOfTrailingLegs", 0);
    if (numOfTrailingLegsToUse <= 0) {
      return name;
    }
    Path infile = new Path(infilepath);
    Path parent = infile.getParent();
    String midName = infile.getName();
    Path outPath = new Path(midName);
    for (int i = 1; i < numOfTrailingLegsToUse; i++) {
      if (parent == null) break;
      midName = parent.getName();
      if (midName.length() == 0) break;
      parent = parent.getParent();
      outPath = new Path(midName, outPath);
    }
    return outPath.toString();
  }

  /**
   * 
   * @param fs
   *          the file system to use
   * @param job
   *          a job conf object
   * @param name
   *          the name of the file over which a record writer object will be
   *          constructed
   * @param arg3
   *          a progressable object
   * @return A RecordWriter object over the given file
   * @throws IOException
   */
  abstract protected RecordWriter<K, V> getBaseRecordWriter(FileSystem fs,
      JobConf job, String name, Progressable arg3) throws IOException;
}

相关信息

hadoop 源码目录

相关文章

hadoop BinaryPartitioner 源码

hadoop Chain 源码

hadoop ChainMapper 源码

hadoop ChainReducer 源码

hadoop CombineFileInputFormat 源码

hadoop CombineFileRecordReader 源码

hadoop CombineFileRecordReaderWrapper 源码

hadoop CombineFileSplit 源码

hadoop CombineSequenceFileInputFormat 源码

hadoop CombineTextInputFormat 源码

0  赞