greenplumn s3bucket_reader 源码
greenplumn s3bucket_reader 代码
文件路径:/gpcontrib/gpcloud/src/s3bucket_reader.cpp
#include "s3bucket_reader.h"
S3BucketReader::S3BucketReader() : Reader() {
this->keyIndex = 0; // doesn't matter, be set in open()
this->s3Interface = NULL;
this->upstreamReader = NULL;
this->needNewReader = true;
this->isFirstFile = true;
}
S3BucketReader::~S3BucketReader() {
this->close();
}
void S3BucketReader::open(const S3Params& params) {
this->params = params;
this->keyIndex = s3ext_segid; // we may change it in unit tests
S3_CHECK_OR_DIE(this->s3Interface != NULL, S3RuntimeError, "s3Interface is NULL");
S3Url& s3Url = this->params.getS3Url();
S3_CHECK_OR_DIE(s3Url.isValidUrl(), S3ConfigError, s3Url.getFullUrlForCurl() + " is not valid",
s3Url.getFullUrlForCurl());
this->keyList = this->s3Interface->listBucket(s3Url);
}
BucketContent& S3BucketReader::getNextKey() {
BucketContent& key = this->keyList.contents[this->keyIndex];
this->keyIndex += s3ext_segnum;
return key;
}
S3Params S3BucketReader::constructReaderParams(BucketContent& key) {
// encode the key name but leave the "/"
// "/encoded_path/encoded_name"
string keyEncoded = UriEncode(key.getName());
FindAndReplace(keyEncoded, "%2F", "/");
S3Params readerParams = this->params.setPrefix(keyEncoded);
readerParams.setKeySize(key.getSize());
S3DEBUG("key: %s, size: %" PRIu64, readerParams.getS3Url().getFullUrlForCurl().c_str(),
readerParams.getKeySize());
return readerParams;
}
uint64_t S3BucketReader::readWithoutHeaderLine(char* buf, uint64_t count) {
char* current = NULL;
char* end = NULL;
char* currentEOL = eolString;
// check one char at a time
while (*currentEOL != '\0') {
if (current == end) {
uint64_t readCount = this->upstreamReader->read(buf, count);
// we have reach the end of file but found no matching EOL.
if (readCount == 0) {
S3WARN("%s", "Reach end of file before matching line terminator");
return 0;
}
current = buf;
end = buf + readCount;
}
// skip until we met next newline char
for (; current != end; current++) {
if (*current == *currentEOL) {
currentEOL++;
current++;
break;
} else {
currentEOL = eolString;
}
}
}
// move remained data to front.
uint64_t remain = end - current;
char* p = buf;
while (current != end) {
*p = *current;
p++;
current++;
}
return remain;
}
uint64_t S3BucketReader::read(char* buf, uint64_t count) {
S3_CHECK_OR_DIE(this->upstreamReader != NULL, S3RuntimeError, "upstreamReader is NULL");
uint64_t readCount = 0;
while (true) {
if (this->needNewReader) {
if (this->keyIndex >= this->keyList.contents.size()) {
S3DEBUG("Read finished for segment: %d", s3ext_segid);
return 0;
}
BucketContent& key = this->getNextKey();
this->upstreamReader->open(constructReaderParams(key));
this->needNewReader = false;
// ignore header line if it is not the first file
if (hasHeader && !this->isFirstFile) {
readCount = readWithoutHeaderLine(buf, count);
if (readCount != 0) {
return readCount;
}
}
}
readCount = this->upstreamReader->read(buf, count);
if (readCount != 0) {
return readCount;
}
// Finished one file, continue to next
this->upstreamReader->close();
this->needNewReader = true;
this->isFirstFile = false;
}
}
void S3BucketReader::close() {
if (this->upstreamReader != NULL) {
this->upstreamReader->close();
this->upstreamReader = NULL;
}
if (!this->keyList.contents.empty()) {
this->keyList.contents.clear();
}
}
相关信息
相关文章
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦