hadoop FileSystemMultipartUploader 源码

  • 2022-10-20
  • 浏览 (523)

haddop FileSystemMultipartUploader 代码

文件路径:/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FileSystemMultipartUploader.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.fs.impl;

import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

import org.apache.hadoop.thirdparty.com.google.common.base.Charsets;
import org.apache.hadoop.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.commons.compress.utils.IOUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.BBPartHandle;
import org.apache.hadoop.fs.BBUploadHandle;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FSDataOutputStreamBuilder;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.InternalOperations;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.PartHandle;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathHandle;
import org.apache.hadoop.fs.UploadHandle;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.functional.FutureIO;

import static org.apache.hadoop.fs.Path.mergePaths;
import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;

/**
 * A MultipartUploader that uses the basic FileSystem commands.
 * This is done in three stages:
 * <ul>
 *   <li>Init - create a temp {@code _multipart} directory.</li>
 *   <li>PutPart - copying the individual parts of the file to the temp
 *   directory.</li>
 *   <li>Complete - use {@link FileSystem#concat} to merge the files;
 *   and then delete the temp directory.</li>
 * </ul>
 */
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class FileSystemMultipartUploader extends AbstractMultipartUploader {

  private static final Logger LOG = LoggerFactory.getLogger(
      FileSystemMultipartUploader.class);

  private final FileSystem fs;

  private final FileSystemMultipartUploaderBuilder builder;

  private final FsPermission permission;

  private final long blockSize;

  private final Options.ChecksumOpt checksumOpt;

  public FileSystemMultipartUploader(
      final FileSystemMultipartUploaderBuilder builder,
      FileSystem fs) {
    super(builder.getPath());
    this.builder = builder;
    this.fs = fs;
    blockSize = builder.getBlockSize();
    checksumOpt = builder.getChecksumOpt();
    permission = builder.getPermission();
  }

  @Override
  public CompletableFuture<UploadHandle> startUpload(Path filePath)
      throws IOException {
    checkPath(filePath);
    return FutureIO.eval(() -> {
      Path collectorPath = createCollectorPath(filePath);
      fs.mkdirs(collectorPath, FsPermission.getDirDefault());

      ByteBuffer byteBuffer = ByteBuffer.wrap(
          collectorPath.toString().getBytes(Charsets.UTF_8));
      return BBUploadHandle.from(byteBuffer);
    });
  }

  @Override
  public CompletableFuture<PartHandle> putPart(UploadHandle uploadId,
      int partNumber, Path filePath,
      InputStream inputStream,
      long lengthInBytes)
      throws IOException {
    checkPutArguments(filePath, inputStream, partNumber, uploadId,
        lengthInBytes);
    return FutureIO.eval(() -> innerPutPart(filePath,
        inputStream, partNumber, uploadId, lengthInBytes));
  }

  private PartHandle innerPutPart(Path filePath,
      InputStream inputStream,
      int partNumber,
      UploadHandle uploadId,
      long lengthInBytes)
      throws IOException {
    byte[] uploadIdByteArray = uploadId.toByteArray();
    checkUploadId(uploadIdByteArray);
    Path collectorPath = new Path(new String(uploadIdByteArray, 0,
        uploadIdByteArray.length, Charsets.UTF_8));
    Path partPath =
        mergePaths(collectorPath, mergePaths(new Path(Path.SEPARATOR),
            new Path(partNumber + ".part")));
    final FSDataOutputStreamBuilder fileBuilder = fs.createFile(partPath);
    if (checksumOpt != null) {
      fileBuilder.checksumOpt(checksumOpt);
    }
    if (permission != null) {
      fileBuilder.permission(permission);
    }
    try (FSDataOutputStream fsDataOutputStream =
             fileBuilder.blockSize(blockSize).build()) {
      IOUtils.copy(inputStream, fsDataOutputStream,
          this.builder.getBufferSize());
    } finally {
      cleanupWithLogger(LOG, inputStream);
    }
    return BBPartHandle.from(ByteBuffer.wrap(
        partPath.toString().getBytes(Charsets.UTF_8)));
  }

  private Path createCollectorPath(Path filePath) {
    String uuid = UUID.randomUUID().toString();
    return mergePaths(filePath.getParent(),
        mergePaths(new Path(filePath.getName().split("\\.")[0]),
            mergePaths(new Path("_multipart_" + uuid),
                new Path(Path.SEPARATOR))));
  }

  private PathHandle getPathHandle(Path filePath) throws IOException {
    FileStatus status = fs.getFileStatus(filePath);
    return fs.getPathHandle(status);
  }

  private long totalPartsLen(List<Path> partHandles) throws IOException {
    long totalLen = 0;
    for (Path p : partHandles) {
      totalLen += fs.getFileStatus(p).getLen();
    }
    return totalLen;
  }

  @Override
  public CompletableFuture<PathHandle> complete(
      UploadHandle uploadId,
      Path filePath,
      Map<Integer, PartHandle> handleMap) throws IOException {

    checkPath(filePath);
    return FutureIO.eval(() ->
        innerComplete(uploadId, filePath, handleMap));
  }

  /**
   * The upload complete operation.
   * @param multipartUploadId the ID of the upload
   * @param filePath path
   * @param handleMap map of handles
   * @return the path handle
   * @throws IOException failure
   */
  private PathHandle innerComplete(
      UploadHandle multipartUploadId, Path filePath,
      Map<Integer, PartHandle> handleMap) throws IOException {

    checkPath(filePath);

    checkUploadId(multipartUploadId.toByteArray());

    checkPartHandles(handleMap);
    List<Map.Entry<Integer, PartHandle>> handles =
        new ArrayList<>(handleMap.entrySet());
    handles.sort(Comparator.comparingInt(Map.Entry::getKey));

    List<Path> partHandles = handles
        .stream()
        .map(pair -> {
          byte[] byteArray = pair.getValue().toByteArray();
          return new Path(new String(byteArray, 0, byteArray.length,
              Charsets.UTF_8));
        })
        .collect(Collectors.toList());

    int count = partHandles.size();
    // built up to identify duplicates -if the size of this set is
    // below that of the number of parts, then there's a duplicate entry.
    Set<Path> values = new HashSet<>(count);
    values.addAll(partHandles);
    Preconditions.checkArgument(values.size() == count,
        "Duplicate PartHandles");
    byte[] uploadIdByteArray = multipartUploadId.toByteArray();
    Path collectorPath = new Path(new String(uploadIdByteArray, 0,
        uploadIdByteArray.length, Charsets.UTF_8));

    boolean emptyFile = totalPartsLen(partHandles) == 0;
    if (emptyFile) {
      fs.create(filePath).close();
    } else {
      Path filePathInsideCollector = mergePaths(collectorPath,
          new Path(Path.SEPARATOR + filePath.getName()));
      fs.create(filePathInsideCollector).close();
      fs.concat(filePathInsideCollector,
          partHandles.toArray(new Path[handles.size()]));
      new InternalOperations()
          .rename(fs, filePathInsideCollector, filePath,
              Options.Rename.OVERWRITE);
    }
    fs.delete(collectorPath, true);
    return getPathHandle(filePath);
  }

  @Override
  public CompletableFuture<Void> abort(UploadHandle uploadId,
      Path filePath)
      throws IOException {
    checkPath(filePath);
    byte[] uploadIdByteArray = uploadId.toByteArray();
    checkUploadId(uploadIdByteArray);
    Path collectorPath = new Path(new String(uploadIdByteArray, 0,
        uploadIdByteArray.length, Charsets.UTF_8));

    return FutureIO.eval(() -> {
      // force a check for a file existing; raises FNFE if not found
      fs.getFileStatus(collectorPath);
      fs.delete(collectorPath, true);
      return null;
    });
  }

}

相关信息

hadoop 源码目录

相关文章

hadoop AbstractFSBuilderImpl 源码

hadoop AbstractMultipartUploader 源码

hadoop CombinedFileRange 源码

hadoop FileRangeImpl 源码

hadoop FileSystemMultipartUploaderBuilder 源码

hadoop FsLinkResolution 源码

hadoop FunctionsRaisingIOE 源码

hadoop FutureDataInputStreamBuilderImpl 源码

hadoop FutureIOSupport 源码

hadoop MultipartUploaderBuilderImpl 源码

0  赞