greenplumn decompress_reader 源码
greenplumn decompress_reader 代码
文件路径:/gpcontrib/gpcloud/src/decompress_reader.cpp
#include "decompress_reader.h"
uint64_t S3_ZIP_DECOMPRESS_CHUNKSIZE = S3_ZIP_DEFAULT_CHUNKSIZE;
DecompressReader::DecompressReader() : isClosed(true) {
this->reader = NULL;
this->in = new char[S3_ZIP_DECOMPRESS_CHUNKSIZE];
this->out = new char[S3_ZIP_DECOMPRESS_CHUNKSIZE];
this->outOffset = 0;
}
DecompressReader::~DecompressReader() {
this->close();
delete this->in;
delete this->out;
}
// Used for unit test to adjust buffer size
void DecompressReader::resizeDecompressReaderBuffer(uint64_t size) {
delete this->in;
delete this->out;
this->in = new char[size];
this->out = new char[size];
this->outOffset = 0;
this->zstream.avail_out = size;
}
void DecompressReader::setReader(Reader *reader) {
this->reader = reader;
}
void DecompressReader::open(const S3Params ¶ms) {
// allocate inflate state for zlib
zstream.zalloc = Z_NULL;
zstream.zfree = Z_NULL;
zstream.opaque = Z_NULL;
zstream.next_in = Z_NULL;
zstream.next_out = (Byte *)this->out;
zstream.avail_in = 0;
zstream.avail_out = S3_ZIP_DECOMPRESS_CHUNKSIZE;
this->outOffset = 0;
// with S3_INFLATE_WINDOWSBITS, it could recognize and decode both zlib and gzip stream.
int ret = inflateInit2(&zstream, S3_INFLATE_WINDOWSBITS);
S3_CHECK_OR_DIE(ret == Z_OK, S3RuntimeError, "failed to initialize zlib library");
this->isClosed = false;
this->reader->open(params);
}
uint64_t DecompressReader::read(char *buf, uint64_t bufSize) {
uint64_t remainingOutLen = this->getDecompressedBytesNum() - this->outOffset;
if (remainingOutLen == 0) {
this->decompress();
this->outOffset = 0; // reset cursor for out buffer to read from beginning.
remainingOutLen = this->getDecompressedBytesNum();
}
uint64_t count = std::min(remainingOutLen, bufSize);
memcpy(buf, this->out + outOffset, count);
this->outOffset += count;
return count;
}
// Read compressed data from underlying reader and decompress to this->out buffer.
// If no more data to consume, this->zstream.avail_out == S3_ZIP_DECOMPRESS_CHUNKSIZE;
void DecompressReader::decompress() {
if (this->zstream.avail_in == 0) {
this->zstream.avail_out = S3_ZIP_DECOMPRESS_CHUNKSIZE;
this->zstream.next_out = (Byte *)this->out;
// read S3_ZIP_DECOMPRESS_CHUNKSIZE data from underlying reader and put into this->in
// buffer. read() might happen more than once when reaching EOF, make sure every time read()
// will return 0.
uint64_t hasRead = this->reader->read(this->in, S3_ZIP_DECOMPRESS_CHUNKSIZE);
// EOF, no more data to decompress.
if (hasRead == 0) {
S3DEBUG(
"No more data to decompress: avail_in = %u, avail_out = %u, total_in = %u, "
"total_out = %u",
zstream.avail_in, zstream.avail_out,
(unsigned int) zstream.total_in, (unsigned int) zstream.total_out);
return;
}
// Fill this->in as possible as it could, otherwise data in this->in might not be able to be
// inflated.
while (hasRead < S3_ZIP_DECOMPRESS_CHUNKSIZE) {
uint64_t count =
this->reader->read(this->in + hasRead, S3_ZIP_DECOMPRESS_CHUNKSIZE - hasRead);
if (count == 0) {
break;
}
hasRead += count;
}
this->zstream.next_in = (Byte *)this->in;
this->zstream.avail_in = hasRead;
} else {
// Still have more data in 'in' buffer to decode.
this->zstream.avail_out = S3_ZIP_DECOMPRESS_CHUNKSIZE;
this->zstream.next_out = (Byte *)this->out;
}
int status = inflate(&this->zstream, Z_NO_FLUSH);
if (status == Z_STREAM_END) {
S3DEBUG("Decompression finished: Z_STREAM_END.");
} else if (status < 0 || status == Z_NEED_DICT) {
inflateEnd(&this->zstream);
S3_CHECK_OR_DIE(
false, S3RuntimeError,
string("Failed to decompress data: ") + std::to_string((unsigned long long)status));
}
}
void DecompressReader::close() {
if (!this->isClosed) {
inflateEnd(&zstream);
this->reader->close();
this->isClosed = true;
}
}
相关信息
相关文章
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦