greenplumn s3key_reader 源码

  • 2022-08-18
  • 浏览 (464)

greenplumn s3key_reader 代码

文件路径:/gpcontrib/gpcloud/include/s3key_reader.h

#ifndef INCLUDE_S3KEY_READER_H_
#define INCLUDE_S3KEY_READER_H_

#include "reader.h"
#include "s3common_headers.h"
#include "s3exception.h"
#include "s3interface.h"

struct Range {
    uint64_t offset;
    uint64_t length;
};

class OffsetMgr {
   public:
    OffsetMgr() : keySize(0), chunkSize(0), curPos(0) {
        pthread_mutex_init(&this->offsetLock, NULL);
    }
    ~OffsetMgr() {
        pthread_mutex_destroy(&this->offsetLock);
    }

    Range getNextOffset();  // ret.length == 0 means EOF

    uint64_t getChunkSize() const {
        return chunkSize;
    }

    void setChunkSize(uint64_t chunkSize) {
        this->chunkSize = chunkSize;
    }

    uint64_t getKeySize() const {
        return keySize;
    }

    void setKeySize(uint64_t keySize) {
        this->keySize = keySize;
    }

    void setCurPos(uint64_t curPos) {
        this->curPos = curPos;
    }

    void reset() {
        this->setCurPos(0);
        this->setChunkSize(0);
        this->setKeySize(0);
    }

    uint64_t getCurPos() const {
        return curPos;
    }

   private:
    pthread_mutex_t offsetLock;
    uint64_t keySize;  // size of S3 key(file)
    uint64_t chunkSize;
    uint64_t curPos;
};

enum ChunkStatus {
    ReadyToRead,
    ReadyToFill,
};

class ChunkBuffer;

class S3KeyReader : public Reader {
   public:
    S3KeyReader()
        : sharedError(false),
          numOfChunks(0),
          curReadingChunk(0),
          transferredKeyLen(0),
          s3Interface(NULL),
          hasEol(false),
          eolAppended(false) {
        pthread_mutex_init(&this->mutexErrorMessage, NULL);
    }
    virtual ~S3KeyReader() {
        this->close();
        pthread_mutex_destroy(&this->mutexErrorMessage);
    }

    void open(const S3Params& params);
    uint64_t read(char* buf, uint64_t count);
    void close();

    void setS3InterfaceService(S3Interface* s3) {
        this->s3Interface = s3;
    }

    const vector<ChunkBuffer>& getChunkBuffers() const {
        return chunkBuffers;
    }

    uint64_t getCurReadingChunk() const {
        return curReadingChunk;
    }

    bool isSharedError() const {
        return sharedError;
    }

    void setSharedError(bool sharedError) {
        pthread_mutex_lock(&this->mutexErrorMessage);
        if (this->sharedException == NULL) {
            this->sharedException = std::current_exception();
        }
        this->sharedError = sharedError;
        pthread_mutex_unlock(&this->mutexErrorMessage);
    }

    template <typename E>
    void setSharedError(bool sharedError, const E& e) {
        pthread_mutex_lock(&this->mutexErrorMessage);
        try {
            throw e;
        } catch (...) {
            this->sharedException = std::current_exception();
        }
        this->sharedError = sharedError;
        pthread_mutex_unlock(&this->mutexErrorMessage);
    }

    const vector<pthread_t>& getThreads() const {
        return threads;
    }

    uint64_t getTransferredKeyLen() const {
        return transferredKeyLen;
    }

    OffsetMgr& getOffsetMgr() {
        return offsetMgr;
    }

    const string& getRegion() const {
        return region;
    }

   private:
    pthread_mutex_t mutexErrorMessage;

    bool sharedError;

    // exception_ptr is used to store exception object
    // and share across threads.
    std::exception_ptr sharedException;

    uint64_t numOfChunks;
    uint64_t curReadingChunk;
    uint64_t transferredKeyLen;
    string region;
    OffsetMgr offsetMgr;

    vector<ChunkBuffer> chunkBuffers;
    vector<pthread_t> threads;

    S3Interface* s3Interface;

    void reset();

    bool hasEol;
    bool eolAppended;
};

class ChunkBuffer {
   public:
    ChunkBuffer(const S3Url& s3Url, S3KeyReader& reader, const S3MemoryContext& context);

    ~ChunkBuffer();

    // if a class has reference member, then it can't be
    // copy assigned by default, we need to implement operator= explicitly.
    // it's needed for vector.
    ChunkBuffer& operator=(const ChunkBuffer& other);

    bool isEOF() {
        return this->eof;
    }

    // Error is shared among all ChunkBuffers of a KeyReader.
    bool isError() {
        return this->sharedKeyReader.isSharedError();
    }

    uint64_t read(char* buf, uint64_t len);
    uint64_t fill();

    void setS3InterfaceService(S3Interface* s3) {
        this->s3Interface = s3;
    }

    pthread_mutex_t* getStatMutex() {
        return &statusMutex;
    }

    pthread_cond_t* getStatCond() {
        return &statusCondVar;
    }

    void setStatus(ChunkStatus status) {
        this->status = status;
    }

    ChunkStatus getStatus() const {
        return status;
    }

    void setSharedError(bool sharedError) {
        this->sharedKeyReader.setSharedError(sharedError);
    }

    template <typename E>
    void setSharedError(bool sharedError, const E& e) {
        this->sharedKeyReader.setSharedError(sharedError, e);
    }

   protected:
    S3Url s3Url;

   private:
    bool eof;

    ChunkStatus status;

    pthread_mutex_t statusMutex;
    pthread_cond_t statusCondVar;

    uint64_t curFileOffset;
    uint64_t curChunkOffset;
    uint64_t chunkDataSize;

    S3VectorUInt8 chunkData;
    OffsetMgr& offsetMgr;
    S3Interface* s3Interface;
    S3KeyReader& sharedKeyReader;
};

#endif /* INCLUDE_S3KEYREADER_H_ */

相关信息

greenplumn 源码目录

相关文章

greenplumn compress_writer 源码

greenplumn decompress_reader 源码

greenplumn gpcheckcloud 源码

greenplumn gpcommon 源码

greenplumn gpreader 源码

greenplumn gpwriter 源码

greenplumn reader 源码

greenplumn restful_service 源码

greenplumn s3bucket_reader 源码

greenplumn s3common_headers 源码

0  赞