hadoop ContextFactory 源码

  • 2022-10-20
  • 浏览 (199)

haddop ContextFactory 代码

文件路径:/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/ContextFactory.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.mapreduce;
 
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;

import org.apache.hadoop.conf.Configuration;
 
/**
 * A factory to allow applications to deal with inconsistencies between
 * MapReduce Context Objects API between hadoop-0.20 and later versions.
 */
public class ContextFactory {

  private static final Constructor<?> JOB_CONTEXT_CONSTRUCTOR;
  private static final Constructor<?> TASK_CONTEXT_CONSTRUCTOR;
  private static final Constructor<?> MAP_CONTEXT_CONSTRUCTOR;
  private static final Constructor<?> MAP_CONTEXT_IMPL_CONSTRUCTOR;
  private static final boolean useV21;
  
  private static final Field REPORTER_FIELD;
  private static final Field READER_FIELD;
  private static final Field WRITER_FIELD;
  private static final Field OUTER_MAP_FIELD;
  private static final Field WRAPPED_CONTEXT_FIELD;
  
  static {
    boolean v21 = true;
    final String PACKAGE = "org.apache.hadoop.mapreduce";
    try {
      Class.forName(PACKAGE + ".task.JobContextImpl");
    } catch (ClassNotFoundException cnfe) {
      v21 = false;
    }
    useV21 = v21;
    Class<?> jobContextCls;
    Class<?> taskContextCls;
    Class<?> taskIOContextCls;
    Class<?> mapCls;
    Class<?> mapContextCls;
    Class<?> innerMapContextCls;
    try {
      if (v21) {
        jobContextCls = 
          Class.forName(PACKAGE+".task.JobContextImpl");
        taskContextCls = 
          Class.forName(PACKAGE+".task.TaskAttemptContextImpl");
        taskIOContextCls = 
          Class.forName(PACKAGE+".task.TaskInputOutputContextImpl");
        mapContextCls = Class.forName(PACKAGE + ".task.MapContextImpl");
        mapCls = Class.forName(PACKAGE + ".lib.map.WrappedMapper");
        innerMapContextCls = 
          Class.forName(PACKAGE+".lib.map.WrappedMapper$Context");
      } else {
        jobContextCls = 
          Class.forName(PACKAGE+".JobContext");
        taskContextCls = 
          Class.forName(PACKAGE+".TaskAttemptContext");
        taskIOContextCls = 
          Class.forName(PACKAGE+".TaskInputOutputContext");
        mapContextCls = Class.forName(PACKAGE + ".MapContext");
        mapCls = Class.forName(PACKAGE + ".Mapper");
        innerMapContextCls = 
          Class.forName(PACKAGE+".Mapper$Context");      
      }
    } catch (ClassNotFoundException e) {
      throw new IllegalArgumentException("Can't find class", e);
    }
    try {
      JOB_CONTEXT_CONSTRUCTOR = 
        jobContextCls.getConstructor(Configuration.class, JobID.class);
      JOB_CONTEXT_CONSTRUCTOR.setAccessible(true);
      TASK_CONTEXT_CONSTRUCTOR = 
        taskContextCls.getConstructor(Configuration.class, 
                                      TaskAttemptID.class);
      TASK_CONTEXT_CONSTRUCTOR.setAccessible(true);
      if (useV21) {
        MAP_CONTEXT_CONSTRUCTOR = 
          innerMapContextCls.getConstructor(mapCls,
                                            MapContext.class);
        MAP_CONTEXT_IMPL_CONSTRUCTOR =
          mapContextCls.getDeclaredConstructor(Configuration.class, 
                                               TaskAttemptID.class,
                                               RecordReader.class,
                                               RecordWriter.class,
                                               OutputCommitter.class,
                                               StatusReporter.class,
                                               InputSplit.class);
        MAP_CONTEXT_IMPL_CONSTRUCTOR.setAccessible(true);
        WRAPPED_CONTEXT_FIELD = 
          innerMapContextCls.getDeclaredField("mapContext");
        WRAPPED_CONTEXT_FIELD.setAccessible(true);
      } else {
        MAP_CONTEXT_CONSTRUCTOR = 
          innerMapContextCls.getConstructor(mapCls,
                                            Configuration.class, 
                                            TaskAttemptID.class,
                                            RecordReader.class,
                                            RecordWriter.class,
                                            OutputCommitter.class,
                                            StatusReporter.class,
                                            InputSplit.class);
        MAP_CONTEXT_IMPL_CONSTRUCTOR = null;
        WRAPPED_CONTEXT_FIELD = null;
      }
      MAP_CONTEXT_CONSTRUCTOR.setAccessible(true);
      REPORTER_FIELD = taskContextCls.getDeclaredField("reporter");
      REPORTER_FIELD.setAccessible(true);
      READER_FIELD = mapContextCls.getDeclaredField("reader");
      READER_FIELD.setAccessible(true);
      WRITER_FIELD = taskIOContextCls.getDeclaredField("output");
      WRITER_FIELD.setAccessible(true);
      OUTER_MAP_FIELD = innerMapContextCls.getDeclaredField("this$0");
      OUTER_MAP_FIELD.setAccessible(true);
    } catch (SecurityException e) {
      throw new IllegalArgumentException("Can't run constructor ", e);
    } catch (NoSuchMethodException e) {
      throw new IllegalArgumentException("Can't find constructor ", e);
    } catch (NoSuchFieldException e) {
      throw new IllegalArgumentException("Can't find field ", e);
    }
  }

  /**
   * Clone a {@link JobContext} or {@link TaskAttemptContext} with a 
   * new configuration.
   * @param original the original context
   * @param conf the new configuration
   * @return a new context object
   * @throws InterruptedException 
   * @throws IOException 
   */
  @SuppressWarnings("unchecked")
  public static JobContext cloneContext(JobContext original,
                                        Configuration conf
                                        ) throws IOException, 
                                                 InterruptedException {
    try {
      if (original instanceof MapContext<?,?,?,?>) {
        return cloneMapContext((Mapper.Context) original, conf, null, null);        
      } else if (original instanceof ReduceContext<?,?,?,?>) {
        throw new IllegalArgumentException("can't clone ReduceContext");
      } else if (original instanceof TaskAttemptContext) {
        TaskAttemptContext spec = (TaskAttemptContext) original;
        return (JobContext) 
          TASK_CONTEXT_CONSTRUCTOR.newInstance(conf, spec.getTaskAttemptID());
      } else {
        return (JobContext) 
        JOB_CONTEXT_CONSTRUCTOR.newInstance(conf, original.getJobID());           
      }
    } catch (InstantiationException e) {
      throw new IllegalArgumentException("Can't clone object", e);
    } catch (IllegalAccessException e) {
      throw new IllegalArgumentException("Can't clone object", e);
    } catch (InvocationTargetException e) {
      throw new IllegalArgumentException("Can't clone object", e);
    }
  }
  
  /**
   * Copy a custom WrappedMapper.Context, optionally replacing 
   * the input and output.
   * @param <K1> input key type
   * @param <V1> input value type
   * @param <K2> output key type
   * @param <V2> output value type
   * @param context the context to clone
   * @param conf a new configuration
   * @param reader Reader to read from. Null means to clone from context.
   * @param writer Writer to write to. Null means to clone from context.
   * @return a new context. it will not be the same class as the original.
   * @throws IOException
   * @throws InterruptedException
   */
  @SuppressWarnings("unchecked")
  public static <K1,V1,K2,V2> Mapper<K1,V1,K2,V2>.Context 
       cloneMapContext(MapContext<K1,V1,K2,V2> context,
                       Configuration conf,
                       RecordReader<K1,V1> reader,
                       RecordWriter<K2,V2> writer
                      ) throws IOException, InterruptedException {
    try {
      // get the outer object pointer
      Object outer = OUTER_MAP_FIELD.get(context);
      // if it is a wrapped 21 context, unwrap it
      if ("org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context".equals
            (context.getClass().getName())) {
        context = (MapContext<K1,V1,K2,V2>) WRAPPED_CONTEXT_FIELD.get(context);
      }
      // if the reader or writer aren't given, use the same ones
      if (reader == null) {
        reader = (RecordReader<K1,V1>) READER_FIELD.get(context);
      }
      if (writer == null) {
        writer = (RecordWriter<K2,V2>) WRITER_FIELD.get(context);
      }
      if (useV21) {
        Object basis = 
          MAP_CONTEXT_IMPL_CONSTRUCTOR.newInstance(conf, 
                                                   context.getTaskAttemptID(),
                                                   reader, writer,
                                                   context.getOutputCommitter(),
                                                   REPORTER_FIELD.get(context),
                                                   context.getInputSplit());
        return (Mapper.Context) 
          MAP_CONTEXT_CONSTRUCTOR.newInstance(outer, basis);
      } else {
        return (Mapper.Context)
          MAP_CONTEXT_CONSTRUCTOR.newInstance(outer,
                                              conf, context.getTaskAttemptID(),
                                              reader, writer,
                                              context.getOutputCommitter(),
                                              REPORTER_FIELD.get(context),
                                              context.getInputSplit());
      }
    } catch (IllegalAccessException e) {
      throw new IllegalArgumentException("Can't access field", e);
    } catch (InstantiationException e) {
      throw new IllegalArgumentException("Can't create object", e);
    } catch (InvocationTargetException e) {
      throw new IllegalArgumentException("Can't invoke constructor", e);
    }
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop Cluster 源码

hadoop ClusterMetrics 源码

hadoop Counter 源码

hadoop CounterGroup 源码

hadoop Counters 源码

hadoop CryptoUtils 源码

hadoop CustomJobEndNotifier 源码

hadoop FileSystemCounter 源码

hadoop ID 源码

hadoop InputFormat 源码

0  赞