greenplumn s3key_reader 源码

  • 2022-08-18
  • 浏览 (466)

greenplumn s3key_reader 代码

文件路径:/gpcontrib/gpcloud/src/s3key_reader.cpp

#include "s3key_reader.h"

// Return (offset, length) of next chunk to download,
// or (fileSize, 0) if reach end of file.
Range OffsetMgr::getNextOffset() {
    Range ret;

    pthread_mutex_lock(&this->offsetLock);
    ret.offset = std::min(this->curPos, this->keySize);

    if (this->curPos + this->chunkSize > this->keySize) {
        ret.length = this->keySize - this->curPos;
        this->curPos = this->keySize;
    } else {
        ret.length = this->chunkSize;
        this->curPos += this->chunkSize;
    }
    pthread_mutex_unlock(&this->offsetLock);

    return ret;
}

ChunkBuffer::ChunkBuffer(const S3Url& s3Url, S3KeyReader& reader, const S3MemoryContext& context)
    : s3Url(s3Url), chunkData(context), offsetMgr(reader.getOffsetMgr()), sharedKeyReader(reader) {
    s3Interface = NULL;
    Range range = offsetMgr.getNextOffset();
    curFileOffset = range.offset;
    chunkDataSize = range.length;
    status = ReadyToFill;
    eof = false;
    curChunkOffset = 0;
    pthread_mutex_init(&this->statusMutex, NULL);
    pthread_cond_init(&this->statusCondVar, NULL);
}

ChunkBuffer::~ChunkBuffer() {
    pthread_mutex_destroy(&this->statusMutex);
    pthread_cond_destroy(&this->statusCondVar);
}

ChunkBuffer& ChunkBuffer::operator=(const ChunkBuffer& other) {
    this->s3Url = other.s3Url;
    this->eof = other.eof;
    this->status = other.status;
    this->curFileOffset = other.curFileOffset;
    this->curChunkOffset = other.curChunkOffset;
    this->chunkDataSize = other.chunkDataSize;

    return *this;
}

// ret < len means EMPTY
// that's why it checks if leftLen is larger than *or equal to* len below[1], provides a chance ret
// is 0, which is smaller than len. Otherwise, other functions won't know when to read next buffer.
uint64_t ChunkBuffer::read(char* buf, uint64_t len) {
    // GPDB abort signal stops s3_import(), this check is not needed if s3_import() every time calls
    // ChunkBuffer->Read() only once, otherwise(as we did in downstreamReader->read() for
    // decompression feature before), first call sets buffer to ReadyToFill, second call hangs.
    S3_CHECK_OR_DIE(!S3QueryIsAbortInProgress(), S3QueryAbort, "");

    UniqueLock statusLock(&this->statusMutex);
    while (this->status != ReadyToRead) {
        pthread_cond_wait(&this->statusCondVar, &this->statusMutex);
    }

    // Error is shared between all chunks.
    if (this->isError()) {
        return 0;
    }

    uint64_t leftLen = this->chunkDataSize - this->curChunkOffset;
    uint64_t lenToRead = std::min(len, leftLen);

    if (lenToRead != 0) {
        memcpy(buf, this->chunkData.data() + this->curChunkOffset, lenToRead);
    }

    if (len <= leftLen) {                   // [1]
        this->curChunkOffset += lenToRead;  // not empty
    } else {                                // empty, reset everything
        this->curChunkOffset = 0;

        if (!this->isEOF()) {
            // Release chunkData memory to reduce consumption.
            this->chunkData.release();

            this->status = ReadyToFill;

            Range range = this->offsetMgr.getNextOffset();
            this->curFileOffset = range.offset;
            this->chunkDataSize = range.length;

            pthread_cond_signal(&this->statusCondVar);
        }
    }

    return lenToRead;
}

// returning uint64_t(-1) means error
uint64_t ChunkBuffer::fill() {
    UniqueLock statusLock(&this->statusMutex);

    while (this->status != ReadyToFill) {
        pthread_cond_wait(&this->statusCondVar, &this->statusMutex);
    }

    if (S3QueryIsAbortInProgress() || this->isError()) {
        this->setSharedError(true);
        this->status = ReadyToRead;
        pthread_cond_signal(&this->statusCondVar);
        return -1;
    }

    uint64_t offset = this->curFileOffset;
    uint64_t leftLen = this->chunkDataSize;

    uint64_t readLen = 0;

    if (leftLen != 0) {
        try {
            readLen = this->s3Interface->fetchData(offset, this->chunkData, leftLen, this->s3Url);
            if (readLen != leftLen) {
                S3DEBUG("Failed to fetch expected data from S3");
                this->setSharedError(true, S3PartialResponseError(leftLen, readLen));
            } else {
                S3DEBUG("Got %" PRIu64 " bytes from S3", readLen);
            }
        } catch (S3Exception& e) {
            S3DEBUG("Failed to fetch expected data from S3");
            this->setSharedError(true);
        }
    }

    if (offset + leftLen >= offsetMgr.getKeySize()) {
        readLen = 0;  // Nothing to read, EOF
        S3DEBUG("Reached the end of file");
        this->eof = true;
    }

    this->status = ReadyToRead;
    pthread_cond_signal(&this->statusCondVar);

    return (this->isError()) ? -1 : readLen;
}

static void* DownloadThreadFunc(void* data) {
    MaskThreadSignals();

    ChunkBuffer* buffer = static_cast<ChunkBuffer*>(data);

    uint64_t filledSize = 0;
    S3DEBUG("Downloading thread starts");
    do {
        if (S3QueryIsAbortInProgress()) {
            S3INFO("Downloading thread is interrupted");

            // error is shared between all chunks, so all chunks will stop.
            buffer->setSharedError(true, S3QueryAbort("Downloading thread is interrupted"));

            // have to unlock ChunkBuffer::read in some certain conditions, for instance, status is
            // not ReadyToRead, and read() is waiting for signal stat_cond.
            buffer->setStatus(ReadyToRead);
            pthread_cond_signal(buffer->getStatCond());

            return NULL;
        }

        filledSize = buffer->fill();

        if (filledSize != 0) {
            if (buffer->isError()) {
                S3DEBUG("Failed to fill downloading buffer");
                break;
            } else {
                S3DEBUG("Size of filled data is %" PRIu64, filledSize);
            }
        }
    } while (!buffer->isEOF());
    S3DEBUG("Downloading thread ended");
    return NULL;
}

void S3KeyReader::open(const S3Params& params) {
    S3_CHECK_OR_DIE(this->s3Interface != NULL, S3RuntimeError, "s3Interface must not be NULL");

    this->sharedError = false;

    this->numOfChunks = params.getNumOfChunks();
    S3_CHECK_OR_DIE(this->numOfChunks > 0, S3RuntimeError, "numOfChunks must not be zero");

    this->offsetMgr.setKeySize(params.getKeySize());
    this->offsetMgr.setChunkSize(params.getChunkSize());

    S3_CHECK_OR_DIE(params.getChunkSize() > 0, S3RuntimeError,
                    "chunk size must be greater than zero");

    this->chunkBuffers.reserve(this->numOfChunks);

    for (uint64_t i = 0; i < this->numOfChunks; i++) {
        this->chunkBuffers.emplace_back(params.getS3Url(), *this, params.getMemoryContext());
    }

    for (uint64_t i = 0; i < this->numOfChunks; i++) {
        this->chunkBuffers[i].setS3InterfaceService(this->s3Interface);

        pthread_t thread;
        pthread_create(&thread, NULL, DownloadThreadFunc, &this->chunkBuffers[i]);
        this->threads.push_back(thread);
    }
}

uint64_t S3KeyReader::read(char* buf, uint64_t count) {
    uint64_t fileLen = this->offsetMgr.getKeySize();
    uint64_t readLen = 0;

    do {
        // confirm there is no more available data, done with this file
        if (this->transferredKeyLen >= fileLen) {
            if (!this->hasEol && !this->eolAppended) {
                uint64_t eolLen = strlen(eolString);
                memcpy(buf, eolString, eolLen);

                this->eolAppended = true;

                return eolLen;
            }

            return 0;
        }

        ChunkBuffer& buffer = chunkBuffers[this->curReadingChunk % this->numOfChunks];

        readLen = buffer.read(buf, count);

        if (this->isSharedError()) {
            if (this->sharedException != NULL) {
                std::rethrow_exception(this->sharedException);
            } else {
                throw S3RuntimeError("Unexpected runtime error, sharedException is NULL");
            }
        }

        this->transferredKeyLen += readLen;
        if (this->transferredKeyLen == fileLen) {
            if (buf[readLen - 1] == '\r' || buf[readLen - 1] == '\n') {
                this->hasEol = true;
            }
        }

        if (readLen < count) {
            this->curReadingChunk++;
        }

        count -= readLen;
    } while (readLen == 0);  // retry to confirm whether thread reading is finished or chunk size is
                             // divisible by get()'s buffer size

    return readLen;
}

// reset marks before reading next key
void S3KeyReader::reset() {
    this->sharedError = false;
    this->curReadingChunk = 0;
    this->transferredKeyLen = 0;

    this->offsetMgr.reset();

    this->chunkBuffers.clear();
    this->threads.clear();

    this->hasEol = false;
    this->eolAppended = false;
}

void S3KeyReader::close() {
    // to interupt downlading thread, we must: (check ChunkBuffer::fill())
    // 1. set condition to ReadyToFill and signal conditional_variable.
    // 2. set the shared error status to prevent download thread from continuing.
    this->sharedError = true;

    for (uint64_t i = 0; i < this->chunkBuffers.size(); i++) {
        UniqueLock lock(this->chunkBuffers[i].getStatMutex());
        this->chunkBuffers[i].setStatus(ReadyToFill);
        pthread_cond_signal(this->chunkBuffers[i].getStatCond());
    }

    for (uint64_t i = 0; i < this->threads.size(); i++) {
        if (this->threads[i] == 0) {
            continue;
        }

        pthread_join(this->threads[i], NULL);

        this->threads[i] = 0;
    }

    this->reset();
}

相关信息

greenplumn 源码目录

相关文章

greenplumn compress_writer 源码

greenplumn decompress_reader 源码

greenplumn gpcloud 源码

greenplumn gpreader 源码

greenplumn gpwriter 源码

greenplumn s3bucket_reader 源码

greenplumn s3common_reader 源码

greenplumn s3common_writer 源码

greenplumn s3conf 源码

greenplumn s3http_headers 源码

0  赞