greenplumn s3key_reader 源码
greenplumn s3key_reader 代码
文件路径:/gpcontrib/gpcloud/src/s3key_reader.cpp
#include "s3key_reader.h"
// Return (offset, length) of next chunk to download,
// or (fileSize, 0) if reach end of file.
Range OffsetMgr::getNextOffset() {
Range ret;
pthread_mutex_lock(&this->offsetLock);
ret.offset = std::min(this->curPos, this->keySize);
if (this->curPos + this->chunkSize > this->keySize) {
ret.length = this->keySize - this->curPos;
this->curPos = this->keySize;
} else {
ret.length = this->chunkSize;
this->curPos += this->chunkSize;
}
pthread_mutex_unlock(&this->offsetLock);
return ret;
}
ChunkBuffer::ChunkBuffer(const S3Url& s3Url, S3KeyReader& reader, const S3MemoryContext& context)
: s3Url(s3Url), chunkData(context), offsetMgr(reader.getOffsetMgr()), sharedKeyReader(reader) {
s3Interface = NULL;
Range range = offsetMgr.getNextOffset();
curFileOffset = range.offset;
chunkDataSize = range.length;
status = ReadyToFill;
eof = false;
curChunkOffset = 0;
pthread_mutex_init(&this->statusMutex, NULL);
pthread_cond_init(&this->statusCondVar, NULL);
}
ChunkBuffer::~ChunkBuffer() {
pthread_mutex_destroy(&this->statusMutex);
pthread_cond_destroy(&this->statusCondVar);
}
ChunkBuffer& ChunkBuffer::operator=(const ChunkBuffer& other) {
this->s3Url = other.s3Url;
this->eof = other.eof;
this->status = other.status;
this->curFileOffset = other.curFileOffset;
this->curChunkOffset = other.curChunkOffset;
this->chunkDataSize = other.chunkDataSize;
return *this;
}
// ret < len means EMPTY
// that's why it checks if leftLen is larger than *or equal to* len below[1], provides a chance ret
// is 0, which is smaller than len. Otherwise, other functions won't know when to read next buffer.
uint64_t ChunkBuffer::read(char* buf, uint64_t len) {
// GPDB abort signal stops s3_import(), this check is not needed if s3_import() every time calls
// ChunkBuffer->Read() only once, otherwise(as we did in downstreamReader->read() for
// decompression feature before), first call sets buffer to ReadyToFill, second call hangs.
S3_CHECK_OR_DIE(!S3QueryIsAbortInProgress(), S3QueryAbort, "");
UniqueLock statusLock(&this->statusMutex);
while (this->status != ReadyToRead) {
pthread_cond_wait(&this->statusCondVar, &this->statusMutex);
}
// Error is shared between all chunks.
if (this->isError()) {
return 0;
}
uint64_t leftLen = this->chunkDataSize - this->curChunkOffset;
uint64_t lenToRead = std::min(len, leftLen);
if (lenToRead != 0) {
memcpy(buf, this->chunkData.data() + this->curChunkOffset, lenToRead);
}
if (len <= leftLen) { // [1]
this->curChunkOffset += lenToRead; // not empty
} else { // empty, reset everything
this->curChunkOffset = 0;
if (!this->isEOF()) {
// Release chunkData memory to reduce consumption.
this->chunkData.release();
this->status = ReadyToFill;
Range range = this->offsetMgr.getNextOffset();
this->curFileOffset = range.offset;
this->chunkDataSize = range.length;
pthread_cond_signal(&this->statusCondVar);
}
}
return lenToRead;
}
// returning uint64_t(-1) means error
uint64_t ChunkBuffer::fill() {
UniqueLock statusLock(&this->statusMutex);
while (this->status != ReadyToFill) {
pthread_cond_wait(&this->statusCondVar, &this->statusMutex);
}
if (S3QueryIsAbortInProgress() || this->isError()) {
this->setSharedError(true);
this->status = ReadyToRead;
pthread_cond_signal(&this->statusCondVar);
return -1;
}
uint64_t offset = this->curFileOffset;
uint64_t leftLen = this->chunkDataSize;
uint64_t readLen = 0;
if (leftLen != 0) {
try {
readLen = this->s3Interface->fetchData(offset, this->chunkData, leftLen, this->s3Url);
if (readLen != leftLen) {
S3DEBUG("Failed to fetch expected data from S3");
this->setSharedError(true, S3PartialResponseError(leftLen, readLen));
} else {
S3DEBUG("Got %" PRIu64 " bytes from S3", readLen);
}
} catch (S3Exception& e) {
S3DEBUG("Failed to fetch expected data from S3");
this->setSharedError(true);
}
}
if (offset + leftLen >= offsetMgr.getKeySize()) {
readLen = 0; // Nothing to read, EOF
S3DEBUG("Reached the end of file");
this->eof = true;
}
this->status = ReadyToRead;
pthread_cond_signal(&this->statusCondVar);
return (this->isError()) ? -1 : readLen;
}
static void* DownloadThreadFunc(void* data) {
MaskThreadSignals();
ChunkBuffer* buffer = static_cast<ChunkBuffer*>(data);
uint64_t filledSize = 0;
S3DEBUG("Downloading thread starts");
do {
if (S3QueryIsAbortInProgress()) {
S3INFO("Downloading thread is interrupted");
// error is shared between all chunks, so all chunks will stop.
buffer->setSharedError(true, S3QueryAbort("Downloading thread is interrupted"));
// have to unlock ChunkBuffer::read in some certain conditions, for instance, status is
// not ReadyToRead, and read() is waiting for signal stat_cond.
buffer->setStatus(ReadyToRead);
pthread_cond_signal(buffer->getStatCond());
return NULL;
}
filledSize = buffer->fill();
if (filledSize != 0) {
if (buffer->isError()) {
S3DEBUG("Failed to fill downloading buffer");
break;
} else {
S3DEBUG("Size of filled data is %" PRIu64, filledSize);
}
}
} while (!buffer->isEOF());
S3DEBUG("Downloading thread ended");
return NULL;
}
void S3KeyReader::open(const S3Params& params) {
S3_CHECK_OR_DIE(this->s3Interface != NULL, S3RuntimeError, "s3Interface must not be NULL");
this->sharedError = false;
this->numOfChunks = params.getNumOfChunks();
S3_CHECK_OR_DIE(this->numOfChunks > 0, S3RuntimeError, "numOfChunks must not be zero");
this->offsetMgr.setKeySize(params.getKeySize());
this->offsetMgr.setChunkSize(params.getChunkSize());
S3_CHECK_OR_DIE(params.getChunkSize() > 0, S3RuntimeError,
"chunk size must be greater than zero");
this->chunkBuffers.reserve(this->numOfChunks);
for (uint64_t i = 0; i < this->numOfChunks; i++) {
this->chunkBuffers.emplace_back(params.getS3Url(), *this, params.getMemoryContext());
}
for (uint64_t i = 0; i < this->numOfChunks; i++) {
this->chunkBuffers[i].setS3InterfaceService(this->s3Interface);
pthread_t thread;
pthread_create(&thread, NULL, DownloadThreadFunc, &this->chunkBuffers[i]);
this->threads.push_back(thread);
}
}
uint64_t S3KeyReader::read(char* buf, uint64_t count) {
uint64_t fileLen = this->offsetMgr.getKeySize();
uint64_t readLen = 0;
do {
// confirm there is no more available data, done with this file
if (this->transferredKeyLen >= fileLen) {
if (!this->hasEol && !this->eolAppended) {
uint64_t eolLen = strlen(eolString);
memcpy(buf, eolString, eolLen);
this->eolAppended = true;
return eolLen;
}
return 0;
}
ChunkBuffer& buffer = chunkBuffers[this->curReadingChunk % this->numOfChunks];
readLen = buffer.read(buf, count);
if (this->isSharedError()) {
if (this->sharedException != NULL) {
std::rethrow_exception(this->sharedException);
} else {
throw S3RuntimeError("Unexpected runtime error, sharedException is NULL");
}
}
this->transferredKeyLen += readLen;
if (this->transferredKeyLen == fileLen) {
if (buf[readLen - 1] == '\r' || buf[readLen - 1] == '\n') {
this->hasEol = true;
}
}
if (readLen < count) {
this->curReadingChunk++;
}
count -= readLen;
} while (readLen == 0); // retry to confirm whether thread reading is finished or chunk size is
// divisible by get()'s buffer size
return readLen;
}
// reset marks before reading next key
void S3KeyReader::reset() {
this->sharedError = false;
this->curReadingChunk = 0;
this->transferredKeyLen = 0;
this->offsetMgr.reset();
this->chunkBuffers.clear();
this->threads.clear();
this->hasEol = false;
this->eolAppended = false;
}
void S3KeyReader::close() {
// to interupt downlading thread, we must: (check ChunkBuffer::fill())
// 1. set condition to ReadyToFill and signal conditional_variable.
// 2. set the shared error status to prevent download thread from continuing.
this->sharedError = true;
for (uint64_t i = 0; i < this->chunkBuffers.size(); i++) {
UniqueLock lock(this->chunkBuffers[i].getStatMutex());
this->chunkBuffers[i].setStatus(ReadyToFill);
pthread_cond_signal(this->chunkBuffers[i].getStatCond());
}
for (uint64_t i = 0; i < this->threads.size(); i++) {
if (this->threads[i] == 0) {
continue;
}
pthread_join(this->threads[i], NULL);
this->threads[i] = 0;
}
this->reset();
}
相关信息
相关文章
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦