greenplumn s3key_reader 源码
greenplumn s3key_reader 代码
文件路径:/gpcontrib/gpcloud/include/s3key_reader.h
#ifndef INCLUDE_S3KEY_READER_H_
#define INCLUDE_S3KEY_READER_H_
#include "reader.h"
#include "s3common_headers.h"
#include "s3exception.h"
#include "s3interface.h"
struct Range {
uint64_t offset;
uint64_t length;
};
class OffsetMgr {
public:
OffsetMgr() : keySize(0), chunkSize(0), curPos(0) {
pthread_mutex_init(&this->offsetLock, NULL);
}
~OffsetMgr() {
pthread_mutex_destroy(&this->offsetLock);
}
Range getNextOffset(); // ret.length == 0 means EOF
uint64_t getChunkSize() const {
return chunkSize;
}
void setChunkSize(uint64_t chunkSize) {
this->chunkSize = chunkSize;
}
uint64_t getKeySize() const {
return keySize;
}
void setKeySize(uint64_t keySize) {
this->keySize = keySize;
}
void setCurPos(uint64_t curPos) {
this->curPos = curPos;
}
void reset() {
this->setCurPos(0);
this->setChunkSize(0);
this->setKeySize(0);
}
uint64_t getCurPos() const {
return curPos;
}
private:
pthread_mutex_t offsetLock;
uint64_t keySize; // size of S3 key(file)
uint64_t chunkSize;
uint64_t curPos;
};
enum ChunkStatus {
ReadyToRead,
ReadyToFill,
};
class ChunkBuffer;
class S3KeyReader : public Reader {
public:
S3KeyReader()
: sharedError(false),
numOfChunks(0),
curReadingChunk(0),
transferredKeyLen(0),
s3Interface(NULL),
hasEol(false),
eolAppended(false) {
pthread_mutex_init(&this->mutexErrorMessage, NULL);
}
virtual ~S3KeyReader() {
this->close();
pthread_mutex_destroy(&this->mutexErrorMessage);
}
void open(const S3Params& params);
uint64_t read(char* buf, uint64_t count);
void close();
void setS3InterfaceService(S3Interface* s3) {
this->s3Interface = s3;
}
const vector<ChunkBuffer>& getChunkBuffers() const {
return chunkBuffers;
}
uint64_t getCurReadingChunk() const {
return curReadingChunk;
}
bool isSharedError() const {
return sharedError;
}
void setSharedError(bool sharedError) {
pthread_mutex_lock(&this->mutexErrorMessage);
if (this->sharedException == NULL) {
this->sharedException = std::current_exception();
}
this->sharedError = sharedError;
pthread_mutex_unlock(&this->mutexErrorMessage);
}
template <typename E>
void setSharedError(bool sharedError, const E& e) {
pthread_mutex_lock(&this->mutexErrorMessage);
try {
throw e;
} catch (...) {
this->sharedException = std::current_exception();
}
this->sharedError = sharedError;
pthread_mutex_unlock(&this->mutexErrorMessage);
}
const vector<pthread_t>& getThreads() const {
return threads;
}
uint64_t getTransferredKeyLen() const {
return transferredKeyLen;
}
OffsetMgr& getOffsetMgr() {
return offsetMgr;
}
const string& getRegion() const {
return region;
}
private:
pthread_mutex_t mutexErrorMessage;
bool sharedError;
// exception_ptr is used to store exception object
// and share across threads.
std::exception_ptr sharedException;
uint64_t numOfChunks;
uint64_t curReadingChunk;
uint64_t transferredKeyLen;
string region;
OffsetMgr offsetMgr;
vector<ChunkBuffer> chunkBuffers;
vector<pthread_t> threads;
S3Interface* s3Interface;
void reset();
bool hasEol;
bool eolAppended;
};
class ChunkBuffer {
public:
ChunkBuffer(const S3Url& s3Url, S3KeyReader& reader, const S3MemoryContext& context);
~ChunkBuffer();
// if a class has reference member, then it can't be
// copy assigned by default, we need to implement operator= explicitly.
// it's needed for vector.
ChunkBuffer& operator=(const ChunkBuffer& other);
bool isEOF() {
return this->eof;
}
// Error is shared among all ChunkBuffers of a KeyReader.
bool isError() {
return this->sharedKeyReader.isSharedError();
}
uint64_t read(char* buf, uint64_t len);
uint64_t fill();
void setS3InterfaceService(S3Interface* s3) {
this->s3Interface = s3;
}
pthread_mutex_t* getStatMutex() {
return &statusMutex;
}
pthread_cond_t* getStatCond() {
return &statusCondVar;
}
void setStatus(ChunkStatus status) {
this->status = status;
}
ChunkStatus getStatus() const {
return status;
}
void setSharedError(bool sharedError) {
this->sharedKeyReader.setSharedError(sharedError);
}
template <typename E>
void setSharedError(bool sharedError, const E& e) {
this->sharedKeyReader.setSharedError(sharedError, e);
}
protected:
S3Url s3Url;
private:
bool eof;
ChunkStatus status;
pthread_mutex_t statusMutex;
pthread_cond_t statusCondVar;
uint64_t curFileOffset;
uint64_t curChunkOffset;
uint64_t chunkDataSize;
S3VectorUInt8 chunkData;
OffsetMgr& offsetMgr;
S3Interface* s3Interface;
S3KeyReader& sharedKeyReader;
};
#endif /* INCLUDE_S3KEYREADER_H_ */
相关信息
相关文章
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦