greenplumn zstd_compression 源码

  • 2022-08-18
  • 浏览 (702)

greenplumn zstd_compression 代码

文件路径:/gpcontrib/zstd/zstd_compression.c

/*---------------------------------------------------------------------
 *
 * zstd_compression.c
 *
 * IDENTIFICATION
 *	    src/backend/catalog/zstd_compression.c
 *
 *---------------------------------------------------------------------
 */

#include "postgres.h"

#include "access/genam.h"
#include "catalog/pg_compression.h"
#include "fmgr.h"
#include "storage/gp_compress.h"
#include "utils/builtins.h"

#include <zstd.h>
#include <zstd_errors.h>

Datum		zstd_constructor(PG_FUNCTION_ARGS);
Datum		zstd_destructor(PG_FUNCTION_ARGS);
Datum		zstd_compress(PG_FUNCTION_ARGS);
Datum		zstd_decompress(PG_FUNCTION_ARGS);
Datum		zstd_validator(PG_FUNCTION_ARGS);

PG_FUNCTION_INFO_V1(zstd_constructor);
PG_FUNCTION_INFO_V1(zstd_destructor);
PG_FUNCTION_INFO_V1(zstd_compress);
PG_FUNCTION_INFO_V1(zstd_decompress);
PG_FUNCTION_INFO_V1(zstd_validator);

#ifndef UNIT_TESTING
PG_MODULE_MAGIC;
#endif

/* Internal state for zstd */
typedef struct zstd_state
{
	int			level;			/* Compression level */
	bool		compress;		/* Compress if true, decompress otherwise */

	zstd_context *ctx;			/* ZSTD compression/decompresion contexts */
} zstd_state;

Datum
zstd_constructor(PG_FUNCTION_ARGS)
{
	/* PG_GETARG_POINTER(0) is TupleDesc that is currently unused. */

	StorageAttributes *sa = (StorageAttributes *) PG_GETARG_POINTER(1);
	CompressionState *cs = palloc0(sizeof(CompressionState));
	zstd_state *state = palloc0(sizeof(zstd_state));
	bool		compress = PG_GETARG_BOOL(2);

	if (!PointerIsValid(sa->comptype))
		elog(ERROR, "zstd_constructor called with no compression type");

	cs->opaque = (void *) state;
	cs->desired_sz = NULL;

	if (sa->complevel == 0)
		sa->complevel = 1;

	state->level = sa->complevel;
	state->compress = compress;

	state->ctx = zstd_alloc_context();
	state->ctx->cctx = ZSTD_createCCtx();
	state->ctx->dctx = ZSTD_createDCtx();

	if (!state->ctx->cctx)
		elog(ERROR, "out of memory");
	if (!state->ctx->dctx)
		elog(ERROR, "out of memory");

	PG_RETURN_POINTER(cs);
}

Datum
zstd_destructor(PG_FUNCTION_ARGS)
{
	CompressionState *cs = (CompressionState *) PG_GETARG_POINTER(0);

	if (cs != NULL && cs->opaque != NULL)
	{
		zstd_state *state = (zstd_state *) cs->opaque;

		zstd_free_context(state->ctx);
		pfree(state);
	}

	PG_RETURN_VOID();
}

/*
 * zstd compression implementation
 *
 * Note that when compression fails due to algorithm inefficiency,
 * dst_used is set so src_sz, but the output buffer contents are left unchanged
 */
Datum
zstd_compress(PG_FUNCTION_ARGS)
{
	/* FIXME: Change types to ZSTD::size_t */
	const void *src = PG_GETARG_POINTER(0);
	int32		src_sz = PG_GETARG_INT32(1);
	void	   *dst = PG_GETARG_POINTER(2);
	int32		dst_sz = PG_GETARG_INT32(3);
	int32	   *dst_used = (int32 *) PG_GETARG_POINTER(4);
	CompressionState *cs = (CompressionState *) PG_GETARG_POINTER(5);
	zstd_state *state = (zstd_state *) cs->opaque;

	unsigned long dst_length_used;

	dst_length_used = ZSTD_compressCCtx(state->ctx->cctx,
										dst, dst_sz,
										src, src_sz,
										state->level);

	if (ZSTD_isError(dst_length_used))
	{
		if (ZSTD_getErrorCode(dst_length_used) == ZSTD_error_dstSize_tooSmall)
		{
			/*
			 * This error is returned when "compressed" output is bigger than
			 * uncompressed input. The caller can detect this by checking
			 * dst_used >= src_size
			 */
			dst_length_used = src_sz;
		}
		else
			elog(ERROR, "%s", ZSTD_getErrorName(dst_length_used));
	}

	*dst_used = (int32) dst_length_used;

	PG_RETURN_VOID();
}

Datum
zstd_decompress(PG_FUNCTION_ARGS)
{
	/* FIXME: Change types to ZSTD::size_t */
	const void *src = PG_GETARG_POINTER(0);
	int32		src_sz = PG_GETARG_INT32(1);
	void	   *dst = PG_GETARG_POINTER(2);
	int32		dst_sz = PG_GETARG_INT32(3);
	int32	   *dst_used = (int32 *) PG_GETARG_POINTER(4);
	CompressionState *cs = (CompressionState *) PG_GETARG_POINTER(5);
	zstd_state *state = (zstd_state *) cs->opaque;

	unsigned long dst_length_used;

	if (src_sz <= 0)
		elog(ERROR, "invalid source buffer size %d", src_sz);
	if (dst_sz <= 0)
		elog(ERROR, "invalid destination buffer size %d", dst_sz);

	dst_length_used = ZSTD_decompressDCtx(state->ctx->dctx,
										  dst, dst_sz,
										  src, src_sz);

	if (ZSTD_isError(dst_length_used))
	{
		elog(ERROR, "%s", ZSTD_getErrorName(dst_length_used));
	}

	*dst_used = (int32) dst_length_used;

	PG_RETURN_VOID();
}

Datum
zstd_validator(PG_FUNCTION_ARGS)
{
	PG_RETURN_VOID();
}

相关信息

greenplumn 源码目录

相关文章

greenplumn adminpack 源码

greenplumn verify_nbtree 源码

greenplumn auth_delay 源码

greenplumn auto_explain 源码

greenplumn blcost 源码

greenplumn blinsert 源码

greenplumn bloom 源码

greenplumn blscan 源码

greenplumn blutils 源码

greenplumn blvacuum 源码

0  赞