hadoop WrappedRecordReader 源码

  • 2022-10-20
  • 浏览 (172)

haddop WrappedRecordReader 代码

文件路径:/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/join/WrappedRecordReader.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.mapred.join;

import java.io.IOException;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapred.RecordReader;

/**
 * Proxy class for a RecordReader participating in the join framework.
 * This class keeps track of the "head" key-value pair for the
 * provided RecordReader and keeps a store of values matching a key when
 * this source is participating in a join.
 */
@InterfaceAudience.Public
@InterfaceStability.Stable
public class WrappedRecordReader<K extends WritableComparable,
                          U extends Writable>
    implements ComposableRecordReader<K,U>, Configurable {

  private boolean empty = false;
  private RecordReader<K,U> rr;
  private int id;  // index at which values will be inserted in collector

  private K khead; // key at the top of this RR
  private U vhead; // value assoc with khead
  private WritableComparator cmp;
  private Configuration conf;

  private ResetableIterator<U> vjoin;

  /**
   * For a given RecordReader rr, occupy position id in collector.
   */
  WrappedRecordReader(int id, RecordReader<K,U> rr,
      Class<? extends WritableComparator> cmpcl) throws IOException {
    this(id, rr, cmpcl, null);
  }

  WrappedRecordReader(int id, RecordReader<K,U> rr,
                      Class<? extends WritableComparator> cmpcl,
                      Configuration conf) throws IOException {
    this.id = id;
    this.rr = rr;
    this.conf = (conf == null) ? new Configuration() : conf;
    khead = rr.createKey();
    vhead = rr.createValue();
    try {
      cmp = (null == cmpcl)
        ? WritableComparator.get(khead.getClass(), this.conf)
        : cmpcl.newInstance();
    } catch (InstantiationException e) {
      throw (IOException)new IOException().initCause(e);
    } catch (IllegalAccessException e) {
      throw (IOException)new IOException().initCause(e);
    }
    vjoin = new StreamBackedIterator<U>();
    next();
  }

  /** {@inheritDoc} */
  public int id() {
    return id;
  }

  /**
   * Return the key at the head of this RR.
   */
  public K key() {
    return khead;
  }

  /**
   * Clone the key at the head of this RR into the object supplied.
   */
  public void key(K qkey) throws IOException {
    WritableUtils.cloneInto(qkey, khead);
  }

  /**
   * Return true if the RR- including the k,v pair stored in this object-
   * is exhausted.
   */
  public boolean hasNext() {
    return !empty;
  }

  /**
   * Skip key-value pairs with keys less than or equal to the key provided.
   */
  public void skip(K key) throws IOException {
    if (hasNext()) {
      while (cmp.compare(khead, key) <= 0 && next());
    }
  }

  /**
   * Read the next k,v pair into the head of this object; return true iff
   * the RR and this are exhausted.
   */
  protected boolean next() throws IOException {
    empty = !rr.next(khead, vhead);
    return hasNext();
  }

  /**
   * Add an iterator to the collector at the position occupied by this
   * RecordReader over the values in this stream paired with the key
   * provided (ie register a stream of values from this source matching K
   * with a collector).
   */
                                 // JoinCollector comes from parent, which has
  @SuppressWarnings("unchecked") // no static type for the slot this sits in
  public void accept(CompositeRecordReader.JoinCollector i, K key)
      throws IOException {
    vjoin.clear();
    if (0 == cmp.compare(key, khead)) {
      do {
        vjoin.add(vhead);
      } while (next() && 0 == cmp.compare(key, khead));
    }
    i.add(id, vjoin);
  }

  /**
   * Write key-value pair at the head of this stream to the objects provided;
   * get next key-value pair from proxied RR.
   */
  public boolean next(K key, U value) throws IOException {
    if (hasNext()) {
      WritableUtils.cloneInto(key, khead);
      WritableUtils.cloneInto(value, vhead);
      next();
      return true;
    }
    return false;
  }

  /**
   * Request new key from proxied RR.
   */
  public K createKey() {
    return rr.createKey();
  }

  /**
   * Request new value from proxied RR.
   */
  public U createValue() {
    return rr.createValue();
  }

  /**
   * Request progress from proxied RR.
   */
  public float getProgress() throws IOException {
    return rr.getProgress();
  }

  /**
   * Request position from proxied RR.
   */
  public long getPos() throws IOException {
    return rr.getPos();
  }

  /**
   * Forward close request to proxied RR.
   */
  public void close() throws IOException {
    rr.close();
  }

  /**
   * Implement Comparable contract (compare key at head of proxied RR
   * with that of another).
   */
  public int compareTo(ComposableRecordReader<K,?> other) {
    return cmp.compare(key(), other.key());
  }

  /**
   * Return true iff compareTo(other) retn true.
   */
  @SuppressWarnings("unchecked") // Explicit type check prior to cast
  public boolean equals(Object other) {
    return other instanceof ComposableRecordReader
        && 0 == compareTo((ComposableRecordReader)other);
  }

  public int hashCode() {
    assert false : "hashCode not designed";
    return 42;
  }

  @Override
  public void setConf(Configuration conf) {
    this.conf = conf;
  }

  @Override
  public Configuration getConf() {
    return conf;
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop ArrayListBackedIterator 源码

hadoop ComposableInputFormat 源码

hadoop ComposableRecordReader 源码

hadoop CompositeInputFormat 源码

hadoop CompositeInputSplit 源码

hadoop CompositeRecordReader 源码

hadoop InnerJoinRecordReader 源码

hadoop JoinRecordReader 源码

hadoop MultiFilterRecordReader 源码

hadoop OuterJoinRecordReader 源码

0  赞