spark LevelDBIterator 源码

  • 2022-10-20
  • 浏览 (538)

spark LevelDBIterator 代码

文件路径:/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBIterator.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.spark.util.kvstore;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import org.iq80.leveldb.DBIterator;

class LevelDBIterator<T> implements KVStoreIterator<T> {

  private final LevelDB db;
  private final boolean ascending;
  private final DBIterator it;
  private final Class<T> type;
  private final LevelDBTypeInfo ti;
  private final LevelDBTypeInfo.Index index;
  private final byte[] indexKeyPrefix;
  private final byte[] end;
  private final long max;

  private boolean checkedNext;
  private byte[] next;
  private boolean closed;
  private long count;

  LevelDBIterator(Class<T> type, LevelDB db, KVStoreView<T> params) throws Exception {
    this.db = db;
    this.ascending = params.ascending;
    this.it = db.db().iterator();
    this.type = type;
    this.ti = db.getTypeInfo(type);
    this.index = ti.index(params.index);
    this.max = params.max;

    Preconditions.checkArgument(!index.isChild() || params.parent != null,
      "Cannot iterate over child index %s without parent value.", params.index);
    byte[] parent = index.isChild() ? index.parent().childPrefix(params.parent) : null;

    this.indexKeyPrefix = index.keyPrefix(parent);

    byte[] firstKey;
    if (params.first != null) {
      if (ascending) {
        firstKey = index.start(parent, params.first);
      } else {
        firstKey = index.end(parent, params.first);
      }
    } else if (ascending) {
      firstKey = index.keyPrefix(parent);
    } else {
      firstKey = index.end(parent);
    }
    it.seek(firstKey);

    byte[] end = null;
    if (ascending) {
      if (params.last != null) {
        end = index.end(parent, params.last);
      } else {
        end = index.end(parent);
      }
    } else {
      if (params.last != null) {
        end = index.start(parent, params.last);
      }
      if (it.hasNext()) {
        // When descending, the caller may have set up the start of iteration at a non-existent
        // entry that is guaranteed to be after the desired entry. For example, if you have a
        // compound key (a, b) where b is a, integer, you may seek to the end of the elements that
        // have the same "a" value by specifying Integer.MAX_VALUE for "b", and that value may not
        // exist in the database. So need to check here whether the next value actually belongs to
        // the set being returned by the iterator before advancing.
        byte[] nextKey = it.peekNext().getKey();
        if (compare(nextKey, indexKeyPrefix) <= 0) {
          it.next();
        }
      }
    }
    this.end = end;

    if (params.skip > 0) {
      skip(params.skip);
    }
  }

  @Override
  public boolean hasNext() {
    if (!checkedNext && !closed) {
      next = loadNext();
      checkedNext = true;
    }
    if (!closed && next == null) {
      try {
        close();
      } catch (IOException ioe) {
        throw Throwables.propagate(ioe);
      }
    }
    return next != null;
  }

  @Override
  public T next() {
    if (!hasNext()) {
      throw new NoSuchElementException();
    }
    checkedNext = false;

    try {
      T ret;
      if (index == null || index.isCopy()) {
        ret = db.serializer.deserialize(next, type);
      } else {
        byte[] key = ti.buildKey(false, ti.naturalIndex().keyPrefix(null), next);
        ret = db.get(key, type);
      }
      next = null;
      return ret;
    } catch (Exception e) {
      throw Throwables.propagate(e);
    }
  }

  @Override
  public void remove() {
    throw new UnsupportedOperationException();
  }

  @Override
  public List<T> next(int max) {
    List<T> list = new ArrayList<>(max);
    while (hasNext() && list.size() < max) {
      list.add(next());
    }
    return list;
  }

  @Override
  public boolean skip(long n) {
    if (closed) return false;

    long skipped = 0;
    while (skipped < n) {
      if (next != null) {
        checkedNext = false;
        next = null;
        skipped++;
        continue;
      }

      boolean hasNext = ascending ? it.hasNext() : it.hasPrev();
      if (!hasNext) {
        checkedNext = true;
        return false;
      }

      Map.Entry<byte[], byte[]> e = ascending ? it.next() : it.prev();
      if (!isEndMarker(e.getKey())) {
        skipped++;
      }
    }

    return hasNext();
  }

  @Override
  public synchronized void close() throws IOException {
    db.notifyIteratorClosed(this);
    if (!closed) {
      it.close();
      closed = true;
      next = null;
    }
  }

  /**
   * Because it's tricky to expose closeable iterators through many internal APIs, especially
   * when Scala wrappers are used, this makes sure that, hopefully, the JNI resources held by
   * the iterator will eventually be released.
   */
  @SuppressWarnings("deprecation")
  @Override
  protected void finalize() throws Throwable {
    db.closeIterator(this);
  }

  private byte[] loadNext() {
    if (count >= max) {
      return null;
    }

    while (true) {
      boolean hasNext = ascending ? it.hasNext() : it.hasPrev();
      if (!hasNext) {
        return null;
      }

      Map.Entry<byte[], byte[]> nextEntry;
      try {
        // Avoid races if another thread is updating the DB.
        nextEntry = ascending ? it.next() : it.prev();
      } catch (NoSuchElementException e) {
        return null;
      }

      byte[] nextKey = nextEntry.getKey();
      // Next key is not part of the index, stop.
      if (!startsWith(nextKey, indexKeyPrefix)) {
        return null;
      }

      // If the next key is an end marker, then skip it.
      if (isEndMarker(nextKey)) {
        continue;
      }

      // If there's a known end key and iteration has gone past it, stop.
      if (end != null) {
        int comp = compare(nextKey, end) * (ascending ? 1 : -1);
        if (comp > 0) {
          return null;
        }
      }

      count++;

      // Next element is part of the iteration, return it.
      return nextEntry.getValue();
    }
  }

  @VisibleForTesting
  static boolean startsWith(byte[] key, byte[] prefix) {
    if (key.length < prefix.length) {
      return false;
    }

    for (int i = 0; i < prefix.length; i++) {
      if (key[i] != prefix[i]) {
        return false;
      }
    }

    return true;
  }

  private boolean isEndMarker(byte[] key) {
    return (key.length > 2 &&
        key[key.length - 2] == LevelDBTypeInfo.KEY_SEPARATOR &&
        key[key.length - 1] == LevelDBTypeInfo.END_MARKER[0]);
  }

  static int compare(byte[] a, byte[] b) {
    int diff = 0;
    int minLen = Math.min(a.length, b.length);
    for (int i = 0; i < minLen; i++) {
      diff += (a[i] - b[i]);
      if (diff != 0) {
        return diff;
      }
    }

    return a.length - b.length;
  }

}

相关信息

spark 源码目录

相关文章

spark ArrayWrappers 源码

spark InMemoryStore 源码

spark KVIndex 源码

spark KVStore 源码

spark KVStoreIterator 源码

spark KVStoreSerializer 源码

spark KVStoreView 源码

spark KVTypeInfo 源码

spark LevelDB 源码

spark LevelDBTypeInfo 源码

0  赞