greenplumn appendonlyam 源码

  • 2022-08-18
  • 浏览 (433)

greenplumn appendonlyam 代码

文件路径:/src/backend/access/appendonly/appendonlyam.c

/*-------------------------------------------------------------------------
 *
 * appendonlyam.c
 *	  append-only relation access method code
 *
 * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
 * Portions Copyright (c) 1994, Regents of the University of California
 * Portions Copyright (c) 2008-2009, Greenplum Inc.
 * Portions Copyright (c) 2012-Present VMware, Inc. or its affiliates.
 *
 *
 * IDENTIFICATION
 *	    src/backend/access/appendonly/appendonlyam.c
 *
 *
 * INTERFACE ROUTINES
 *		appendonly_beginscan		- begin relation scan
 *		appendonly_rescan			- restart a relation scan
 *		appendonly_endscan			- end relation scan
 *		appendonly_getnextslot		- retrieve next tuple in scan
 *		appendonly_insert_init		- initialize an insert operation
 *		appendonly_insert			- insert tuple into a relation
 *		appendonly_insert_finish	- finish an insert operation
 *
 * NOTES
 *	  This file contains the appendonly_ routines which implement
 *	  the access methods used for all append-only relations.
 *
 *-------------------------------------------------------------------------
 */
#include "postgres.h"

#include <math.h>
#include "catalog/storage.h"
#include "access/multixact.h"
#include "catalog/storage_xlog.h"

#include "access/aosegfiles.h"
#include "access/appendonlytid.h"
#include "access/appendonlywriter.h"
#include "access/aomd.h"
#include "access/transam.h"
#include "access/tupdesc.h"
#include "access/tuptoaster.h"
#include "access/valid.h"
#include "access/xact.h"
#include "catalog/catalog.h"
#include "catalog/gp_fastsequence.h"
#include "catalog/namespace.h"
#include "catalog/pg_appendonly.h"
#include "catalog/pg_attribute_encoding.h"
#include "cdb/cdbappendonlyam.h"
#include "cdb/cdbappendonlystorage.h"
#include "cdb/cdbappendonlystorageformat.h"
#include "cdb/cdbappendonlystoragelayer.h"
#include "cdb/cdbvars.h"
#include "fmgr.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "utils/datum.h"
#include "utils/faultinjector.h"
#include "utils/guc.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/snapmgr.h"

/*
 * AppendOnlyDeleteDescData is used for delete data from append-only
 * relations. It serves an equivalent purpose as AppendOnlyScanDescData
 * (relscan.h) only that the later is used for scanning append-only
 * relations.
 */
typedef struct AppendOnlyDeleteDescData
{
	/*
	 * Relation to delete from
	 */
	Relation	aod_rel;

	/*
	 * Snapshot to use for meta data operations
	 */
	Snapshot	appendOnlyMetaDataSnapshot;

	/*
	 * visibility map
	 */
	AppendOnlyVisimap visibilityMap;

	/*
	 * Visimap delete support structure. Used to handle out-of-order deletes
	 */
	AppendOnlyVisimapDelete visiMapDelete;

}			AppendOnlyDeleteDescData;

/*
 * AppendOnlyUpdateDescData is used to update data from append-only
 * relations. It serves an equivalent purpose as AppendOnlyScanDescData
 * (relscan.h) only that the later is used for scanning append-only
 * relations.
 */
typedef struct AppendOnlyUpdateDescData
{
	AppendOnlyInsertDesc aoInsertDesc;

	AppendOnlyVisimap visibilityMap;

	/*
	 * Visimap delete support structure. Used to handle out-of-order deletes
	 */
	AppendOnlyVisimapDelete visiMapDelete;

}			AppendOnlyUpdateDescData;

typedef enum AoExecutorBlockKind
{
	AoExecutorBlockKind_None = 0,
	AoExecutorBlockKind_VarBlock,
	AoExecutorBlockKind_SingleRow,
	MaxAoExecutorBlockKind		/* must always be last */
}			AoExecutorBlockKind;

static void AppendOnlyExecutionReadBlock_SetSegmentFileNum(
											   AppendOnlyExecutorReadBlock *executorReadBlock,
											   int segmentFileNum);

static void AppendOnlyExecutionReadBlock_SetPositionInfo(
											 AppendOnlyExecutorReadBlock *executorReadBlock,
											 int64 blockFirstRowNum);

static void AppendOnlyExecutorReadBlock_Init(
								 AppendOnlyExecutorReadBlock *executorReadBlock,
								 Relation relation,
								 MemoryContext memoryContext,
								 AppendOnlyStorageRead *storageRead,
								 int32 usableBlockSize);

static void AppendOnlyExecutorReadBlock_Finish(
								   AppendOnlyExecutorReadBlock *executorReadBlock);

static void AppendOnlyExecutorReadBlock_ResetCounts(
										AppendOnlyExecutorReadBlock *executorReadBlock);

/* ----------------
 *		initscan - scan code common to appendonly_beginscan and appendonly_rescan
 * ----------------
 */
static void
initscan(AppendOnlyScanDesc scan, ScanKey key)
{
	/*
	 * copy the scan key, if appropriate
	 */
	if (key != NULL)
		memcpy(scan->aos_key, key, scan->aos_nkeys * sizeof(ScanKeyData));

	scan->aos_filenamepath[0] = '\0';
	scan->aos_segfiles_processed = 0;
	scan->aos_need_new_segfile = true;	/* need to assign a file to be scanned */
	scan->aos_done_all_segfiles = false;
	scan->bufferDone = true;

	if (scan->initedStorageRoutines)
		AppendOnlyExecutorReadBlock_ResetCounts(
												&scan->executorReadBlock);

	scan->executorReadBlock.mt_bind = NULL;

	pgstat_count_heap_scan(scan->aos_rd);
}

/*
 * Open the next file segment to scan and allocate all resources needed for it.
 */
static bool
SetNextFileSegForRead(AppendOnlyScanDesc scan)
{
	Relation	reln = scan->aos_rd;
	int			segno = -1;
	int64		eof = 0;
	int			formatversion = -2; /* some invalid value */
	bool		finished_all_files = true;	/* assume */
	int32		fileSegNo;

	Assert(scan->aos_need_new_segfile); /* only call me when last segfile
										 * completed */
	Assert(!scan->aos_done_all_segfiles);	/* don't call me if I told you to
											 * stop */


	if (!scan->initedStorageRoutines)
	{
		PGFunction *fns = NULL;

		AppendOnlyStorageRead_Init(
								   &scan->storageRead,
								   scan->aoScanInitContext,
								   scan->usableBlockSize,
								   NameStr(scan->aos_rd->rd_rel->relname),
								   scan->title,
								   &scan->storageAttributes);

		/*
		 * There is no guarantee that the current memory context will be
		 * preserved between calls, so switch to a safe memory context for
		 * retrieving compression information.
		 */
		MemoryContext oldMemoryContext = MemoryContextSwitchTo(scan->aoScanInitContext);

		/* Get the relation specific compression functions */

		fns = get_funcs_for_compression(scan->storageAttributes.compressType);
		scan->storageRead.compression_functions = fns;

		if (scan->storageRead.compression_functions != NULL)
		{
			PGFunction	cons = fns[COMPRESSION_CONSTRUCTOR];
			CompressionState *cs;
			StorageAttributes sa;

			sa.comptype = scan->storageAttributes.compressType;
			sa.complevel = scan->storageAttributes.compressLevel;
			sa.blocksize = scan->usableBlockSize;

			/*
			 * The relation's tuple descriptor allows the compression
			 * constructor to make decisions about how to compress or
			 * decompress the relation given it's structure.
			 */
			cs = callCompressionConstructor(cons,
											RelationGetDescr(reln),
											&sa,
											false /* decompress */ );
			scan->storageRead.compressionState = cs;
		}

		/* Switch back to caller's memory context. */
		MemoryContextSwitchTo(oldMemoryContext);

		AppendOnlyExecutorReadBlock_Init(
										 &scan->executorReadBlock,
										 scan->aos_rd,
										 scan->aoScanInitContext,
										 &scan->storageRead,
										 scan->usableBlockSize);

		scan->bufferDone = true;	/* so we read a new buffer right away */

		scan->initedStorageRoutines = true;
	}

	/*
	 * Do we have more segment files to read or are we done?
	 */
	while (scan->aos_segfiles_processed < scan->aos_total_segfiles)
	{
		/* still have more segment files to read. get info of the next one */
		FileSegInfo *fsinfo = scan->aos_segfile_arr[scan->aos_segfiles_processed];

		segno = fsinfo->segno;
		formatversion = fsinfo->formatversion;
		eof = (int64) fsinfo->eof;

		scan->aos_segfiles_processed++;

		/*
		 * If the 'eof' is zero or it's just a lingering dropped segment
		 * (which we see as dead, too), skip it.
		 */
		if (eof > 0 && fsinfo->state != AOSEG_STATE_AWAITING_DROP)
		{
			/* Initialize the block directory for inserts if needed. */
			if (scan->blockDirectory)
			{
				AppendOnlyBlockDirectory_Init_forInsert(scan->blockDirectory,
														scan->appendOnlyMetaDataSnapshot,
														fsinfo,
														0,	/* lastSequence */
														scan->aos_rd,
														segno,	/* segno */
														1,	/* columnGroupNo */
														false);
			}

			finished_all_files = false;
			break;
		}
	}

	if (finished_all_files)
	{
		/* finished reading all segment files */
		scan->aos_need_new_segfile = false;
		scan->aos_done_all_segfiles = true;

		return false;
	}

	MakeAOSegmentFileName(reln, segno, -1, &fileSegNo, scan->aos_filenamepath);
	Assert(strlen(scan->aos_filenamepath) + 1 <= scan->aos_filenamepath_maxlen);

	Assert(scan->initedStorageRoutines);

	AppendOnlyStorageRead_OpenFile(
								   &scan->storageRead,
								   scan->aos_filenamepath,
								   formatversion,
								   eof);

	AppendOnlyExecutionReadBlock_SetSegmentFileNum(
												   &scan->executorReadBlock,
												   segno);

	AppendOnlyExecutionReadBlock_SetPositionInfo(
												 &scan->executorReadBlock,
												  /* blockFirstRowNum */ 1);

	/* ready to go! */
	scan->aos_need_new_segfile = false;


	elogif(Debug_appendonly_print_scan, LOG,
		   "Append-only scan initialize for table '%s', %u/%u/%u, segment file %u, EOF " INT64_FORMAT ", "
		   "(compression = %s, usable blocksize %d)",
		   NameStr(scan->aos_rd->rd_rel->relname),
		   scan->aos_rd->rd_node.spcNode,
		   scan->aos_rd->rd_node.dbNode,
		   scan->aos_rd->rd_node.relNode,
		   segno,
		   eof,
		   (scan->storageAttributes.compress ? "true" : "false"),
		   scan->usableBlockSize);


	return true;
}

/*
 * errcontext_appendonly_insert_block_user_limit
 *
 * Add an errcontext() line showing the table name but little else because this is a user
 * caused error.
 */
static int
errcontext_appendonly_insert_block_user_limit(AppendOnlyInsertDesc aoInsertDesc)
{
	char	   *relationName = NameStr(aoInsertDesc->aoi_rel->rd_rel->relname);

	errcontext("Append-Only table '%s'", relationName);

	return 0;
}

/*
 * Open the next file segment for write.
 */
static void
SetCurrentFileSegForWrite(AppendOnlyInsertDesc aoInsertDesc)
{
	RelFileNodeBackend rnode;

	FileSegInfo *fsinfo;
	int64		eof;
	int64		eof_uncompressed;
	int64		varblockcount;
	int32		fileSegNo;

	rnode.node = aoInsertDesc->aoi_rel->rd_node;
	rnode.backend = aoInsertDesc->aoi_rel->rd_backend;

	/* Make the 'segment' file name */
	MakeAOSegmentFileName(aoInsertDesc->aoi_rel,
						  aoInsertDesc->cur_segno, -1,
						  &fileSegNo,
						  aoInsertDesc->appendFilePathName);
	Assert(strlen(aoInsertDesc->appendFilePathName) + 1 <= aoInsertDesc->appendFilePathNameMaxLen);

	/*
	 * Now, get the information for the file segment we are going to append
	 * to.
	 */
	aoInsertDesc->fsInfo = GetFileSegInfo(aoInsertDesc->aoi_rel,
										  aoInsertDesc->appendOnlyMetaDataSnapshot,
										  aoInsertDesc->cur_segno,
										  true);

	/* Never insert into a segment that is awaiting a drop */
	if (aoInsertDesc->fsInfo->state == AOSEG_STATE_AWAITING_DROP)
		elog(ERROR, "cannot insert into segno (%d) from AO relid %u that is in state AOSEG_STATE_AWAITING_DROP",
			 aoInsertDesc->cur_segno, RelationGetRelid(aoInsertDesc->aoi_rel));

	fsinfo = aoInsertDesc->fsInfo;
	Assert(fsinfo);
	eof = (int64) fsinfo->eof;
	eof_uncompressed = (int64) fsinfo->eof_uncompressed;
	varblockcount = (int64) fsinfo->varblockcount;
	aoInsertDesc->rowCount = fsinfo->total_tupcount;

	/*
	 * Segment file #0 is created when the Append-Only table is created.
	 *
	 * Other segment files are created on-demand under transaction.
	 */
	if (aoInsertDesc->cur_segno > 0 && eof == 0)
	{
		AppendOnlyStorageWrite_TransactionCreateFile(&aoInsertDesc->storageWrite,
													 &rnode,
													 aoInsertDesc->cur_segno);
	}

	/*
	 * Open the existing file for write.
	 */
	AppendOnlyStorageWrite_OpenFile(&aoInsertDesc->storageWrite,
									aoInsertDesc->appendFilePathName,
									aoInsertDesc->fsInfo->formatversion,
									eof,
									eof_uncompressed,
									&rnode,
									aoInsertDesc->cur_segno);

	/* reset counts */
	aoInsertDesc->insertCount = 0;
	aoInsertDesc->varblockCount = 0;

	/*
	 * Use the current block count from the segfile info so our system log
	 * error messages are accurate.
	 */
	aoInsertDesc->bufferCount = varblockcount;
}

/*
 * Finished scanning this file segment. Close it.
 */
static void
CloseScannedFileSeg(AppendOnlyScanDesc scan)
{
	AppendOnlyStorageRead_CloseFile(&scan->storageRead);

	scan->aos_need_new_segfile = true;
}

/*
 * Finished writing to this file segment. Update catalog and close file.
 */
static void
CloseWritableFileSeg(AppendOnlyInsertDesc aoInsertDesc)
{
	int64		fileLen;
	int64		fileLen_uncompressed;

	AppendOnlyStorageWrite_TransactionFlushAndCloseFile(&aoInsertDesc->storageWrite,
														&fileLen,
														&fileLen_uncompressed);

	/*
	 * Update the AO segment info table with our new eof
	 */
	UpdateFileSegInfo(aoInsertDesc->aoi_rel,
					  aoInsertDesc->cur_segno,
					  fileLen,
					  fileLen_uncompressed,
					  aoInsertDesc->insertCount,
					  aoInsertDesc->varblockCount,
					  (aoInsertDesc->skipModCountIncrement ? 0 : 1),
					  AOSEG_STATE_USECURRENT);

	pfree(aoInsertDesc->fsInfo);
	aoInsertDesc->fsInfo = NULL;

	elogif(Debug_appendonly_print_insert, LOG,
		   "Append-only scan closed write file segment #%d for table %s "
		   "(file length " INT64_FORMAT ", insert count " INT64_FORMAT ", VarBlock count " INT64_FORMAT,
		   aoInsertDesc->cur_segno,
		   NameStr(aoInsertDesc->aoi_rel->rd_rel->relname),
		   fileLen,
		   aoInsertDesc->insertCount,
		   aoInsertDesc->varblockCount);

}

/* ------------------------------------------------------------------------------ */

static void
AppendOnlyExecutorReadBlock_GetContents(AppendOnlyExecutorReadBlock *executorReadBlock)
{
	VarBlockCheckError varBlockCheckError;

	if (!executorReadBlock->isCompressed)
	{
		if (!executorReadBlock->isLarge)
		{
			/*
			 * Small content.
			 */
			executorReadBlock->dataBuffer =
				AppendOnlyStorageRead_GetBuffer(executorReadBlock->storageRead);

			elogif(Debug_appendonly_print_scan, LOG,
				   "Append-only scan read small non-compressed block for table '%s' "
				   "(length = %d, segment file '%s', block offset in file = " INT64_FORMAT ")",
				   AppendOnlyStorageRead_RelationName(executorReadBlock->storageRead),
				   executorReadBlock->dataLen,
				   AppendOnlyStorageRead_SegmentFileName(executorReadBlock->storageRead),
				   executorReadBlock->headerOffsetInFile);
		}
		else
		{
			/*
			 * Large row.
			 */

			/* UNDONE: Error out if NOTOAST isn't ON. */

			/* UNDONE: Error out if it is not a single row */
			Assert(executorReadBlock->executorBlockKind == AoExecutorBlockKind_SingleRow);

			/*
			 * Enough room in our private buffer? UNDONE: Is there a way to
			 * avoid the 2nd copy later doProcessTuple?
			 */
			if (executorReadBlock->largeContentBufferLen < executorReadBlock->dataLen)
			{
				MemoryContext oldMemoryContext;

				/*
				 * Buffer too small.
				 */
				oldMemoryContext =
					MemoryContextSwitchTo(executorReadBlock->memoryContext);

				if (executorReadBlock->largeContentBuffer != NULL)
				{
					/*
					 * Make sure we set the our pointer to NULL here in case
					 * the subsequent allocation fails.  Otherwise cleanup
					 * will get confused.
					 */
					pfree(executorReadBlock->largeContentBuffer);
					executorReadBlock->largeContentBuffer = NULL;
				}

				executorReadBlock->largeContentBuffer = (uint8 *) palloc(executorReadBlock->dataLen);
				executorReadBlock->largeContentBufferLen = executorReadBlock->dataLen;

				/*
				 * Deallocation and allocation done.  Go back to caller
				 * memory-context.
				 */
				MemoryContextSwitchTo(oldMemoryContext);
			}

			executorReadBlock->dataBuffer = executorReadBlock->largeContentBuffer;

			AppendOnlyStorageRead_Content(executorReadBlock->storageRead,
										  executorReadBlock->dataBuffer,
										  executorReadBlock->dataLen);

			elogif(Debug_appendonly_print_scan, LOG,
				   "Append-only scan read large row for table '%s' "
				   "(length = %d, segment file '%s', "
				   "block offset in file = " INT64_FORMAT ")",
				   AppendOnlyStorageRead_RelationName(executorReadBlock->storageRead),
				   executorReadBlock->dataLen,
				   AppendOnlyStorageRead_SegmentFileName(executorReadBlock->storageRead),
				   executorReadBlock->headerOffsetInFile);
		}
	}
	else
	{
		int32		compressedLen =
		AppendOnlyStorageRead_CurrentCompressedLen(executorReadBlock->storageRead);

		/*
		 * AppendOnlyStorageWrite does not report compressed for large content
		 * metadata.
		 */
		Assert(!executorReadBlock->isLarge);

		/*
		 * Decompress into our temporary buffer.
		 */
		executorReadBlock->dataBuffer = executorReadBlock->uncompressedBuffer;

		AppendOnlyStorageRead_Content(executorReadBlock->storageRead,
									  executorReadBlock->dataBuffer,
									  executorReadBlock->dataLen);

		elogif(Debug_appendonly_print_scan, LOG,
			   "Append-only scan read decompressed block for table '%s' "
			   "(compressed length %d, length = %d, segment file '%s', "
			   "block offset in file = " INT64_FORMAT ")",
			   AppendOnlyStorageRead_RelationName(executorReadBlock->storageRead),
			   compressedLen,
			   executorReadBlock->dataLen,
			   AppendOnlyStorageRead_SegmentFileName(executorReadBlock->storageRead),
			   executorReadBlock->headerOffsetInFile);
	}

	/*
	 * The executorBlockKind value is what the executor -- i.e. the upper part
	 * of this appendonlyam module! -- has stored in the Append-Only Storage
	 * header.  We interpret it here.
	 */

	switch (executorReadBlock->executorBlockKind)
	{
		case AoExecutorBlockKind_VarBlock:
			varBlockCheckError = VarBlockIsValid(executorReadBlock->dataBuffer, executorReadBlock->dataLen);
			if (varBlockCheckError != VarBlockCheckOk)
				ereport(ERROR,
						(errcode(ERRCODE_INTERNAL_ERROR),
						 errmsg("VarBlock is not valid, valid block check error %d, detail '%s'",
								varBlockCheckError,
								VarBlockGetCheckErrorStr()),
						 errdetail_appendonly_read_storage_content_header(executorReadBlock->storageRead),
						 errcontext_appendonly_read_storage_block(executorReadBlock->storageRead)));

			/*
			 * Now use the VarBlock module to extract the items out.
			 */
			VarBlockReaderInit(&executorReadBlock->varBlockReader,
							   executorReadBlock->dataBuffer,
							   executorReadBlock->dataLen);

			executorReadBlock->readerItemCount = VarBlockReaderItemCount(&executorReadBlock->varBlockReader);

			executorReadBlock->currentItemCount = 0;

			if (executorReadBlock->rowCount != executorReadBlock->readerItemCount)
			{
				ereport(ERROR,
						(errcode(ERRCODE_INTERNAL_ERROR),
						 errmsg("row count %d in append-only storage header does not match VarBlock item count %d",
								executorReadBlock->rowCount,
								executorReadBlock->readerItemCount),
						 errdetail_appendonly_read_storage_content_header(executorReadBlock->storageRead),
						 errcontext_appendonly_read_storage_block(executorReadBlock->storageRead)));
			}

			elogif(Debug_appendonly_print_scan, LOG,
				   "append-only scan read VarBlock for table '%s' with %d items (block offset in file = " INT64_FORMAT ")",
				   AppendOnlyStorageRead_RelationName(executorReadBlock->storageRead),
				   executorReadBlock->readerItemCount,
				   executorReadBlock->headerOffsetInFile);
			break;

		case AoExecutorBlockKind_SingleRow:
			if (executorReadBlock->rowCount != 1)
			{
				ereport(ERROR,
						(errcode(ERRCODE_INTERNAL_ERROR),
						 errmsg("row count %d in append-only storage header is not 1 for single row",
								executorReadBlock->rowCount),
						 errdetail_appendonly_read_storage_content_header(executorReadBlock->storageRead),
						 errcontext_appendonly_read_storage_block(executorReadBlock->storageRead)));
			}
			executorReadBlock->singleRow = executorReadBlock->dataBuffer;
			executorReadBlock->singleRowLen = executorReadBlock->dataLen;

			elogif(Debug_appendonly_print_scan, LOG, "Append-only scan read single row for table '%s' with length %d (block offset in file = " INT64_FORMAT ")",
				   AppendOnlyStorageRead_RelationName(executorReadBlock->storageRead),
				   executorReadBlock->singleRowLen,
				   executorReadBlock->headerOffsetInFile);

			break;

		default:
			elog(ERROR, "Unrecognized append-only executor block kind: %d",
				 executorReadBlock->executorBlockKind);
			break;
	}
}

static bool
AppendOnlyExecutorReadBlock_GetBlockInfo(AppendOnlyStorageRead *storageRead,
										 AppendOnlyExecutorReadBlock *executorReadBlock)
{
	int64		blockFirstRowNum = executorReadBlock->blockFirstRowNum;

	if (!AppendOnlyStorageRead_GetBlockInfo(storageRead,
											&executorReadBlock->dataLen,
											&executorReadBlock->executorBlockKind,
											&executorReadBlock->blockFirstRowNum,
											&executorReadBlock->rowCount,
											&executorReadBlock->isLarge,
											&executorReadBlock->isCompressed))
	{
		return false;
	}

	/*
	 * If the firstRowNum is not stored in the AOBlock,
	 * executorReadBlock->blockFirstRowNum is set to -1. Since this is
	 * properly updated by calling functions
	 * AppendOnlyExecutionReadBlock_SetPositionInfo and
	 * AppendOnlyExecutionReadBlock_FinishedScanBlock, we restore the last
	 * value when the block does not contain firstRowNum.
	 */
	if (executorReadBlock->blockFirstRowNum < 0)
	{
		executorReadBlock->blockFirstRowNum = blockFirstRowNum;
	}

	executorReadBlock->headerOffsetInFile =
		AppendOnlyStorageRead_CurrentHeaderOffsetInFile(storageRead);

	/* UNDONE: Check blockFirstRowNum */

	return true;
}

static void
AppendOnlyExecutionReadBlock_SetSegmentFileNum(AppendOnlyExecutorReadBlock *executorReadBlock,
											   int segmentFileNum)
{
	executorReadBlock->segmentFileNum = segmentFileNum;
}

static void
AppendOnlyExecutionReadBlock_SetPositionInfo(AppendOnlyExecutorReadBlock *executorReadBlock,
											 int64 blockFirstRowNum)
{
	executorReadBlock->blockFirstRowNum = blockFirstRowNum;
}

static void
AppendOnlyExecutionReadBlock_FinishedScanBlock(AppendOnlyExecutorReadBlock *executorReadBlock)
{
	executorReadBlock->blockFirstRowNum += executorReadBlock->rowCount;
}

/*
 * Initialize the ExecutorReadBlock once.  Assumed to be zeroed out before the call.
 */
static void
AppendOnlyExecutorReadBlock_Init(AppendOnlyExecutorReadBlock *executorReadBlock,
								 Relation relation,
								 MemoryContext memoryContext,
								 AppendOnlyStorageRead *storageRead,
								 int32 usableBlockSize)
{
	MemoryContext oldcontext;

	AssertArg(MemoryContextIsValid(memoryContext));

	oldcontext = MemoryContextSwitchTo(memoryContext);
	executorReadBlock->uncompressedBuffer = (uint8 *) palloc0(usableBlockSize * sizeof(uint8));

	executorReadBlock->storageRead = storageRead;
	executorReadBlock->memoryContext = memoryContext;

	MemoryContextSwitchTo(oldcontext);
}

/*
 * Free the space allocated inside ExexcutorReadBlock.
 */
static void
AppendOnlyExecutorReadBlock_Finish(AppendOnlyExecutorReadBlock *executorReadBlock)
{
	if (executorReadBlock->uncompressedBuffer)
	{
		pfree(executorReadBlock->uncompressedBuffer);
		executorReadBlock->uncompressedBuffer = NULL;
	}

	if (executorReadBlock->numericAtts)
	{
		pfree(executorReadBlock->numericAtts);
		executorReadBlock->numericAtts = NULL;
	}

	if (executorReadBlock->mt_bind)
	{
		pfree(executorReadBlock->mt_bind);
		executorReadBlock->mt_bind = NULL;
	}
}

static void
AppendOnlyExecutorReadBlock_ResetCounts(AppendOnlyExecutorReadBlock *executorReadBlock)
{
	executorReadBlock->totalRowsScannned = 0;
}

/*
 * Given a tuple in 'formatversion', convert it to a format that is
 * understood by the rest of the system.
 */
static MemTuple
upgrade_tuple(AppendOnlyExecutorReadBlock *executorReadBlock,
			  MemTuple mtup, MemTupleBinding *pbind, int formatversion, bool *shouldFree)
{
	TupleDesc	tupdesc = pbind->tupdesc;
	const int	natts = tupdesc->natts;
	MemTuple	newtuple;
	int			i;

	static Datum *values = NULL;
	static bool *isnull = NULL;
	static int	nallocated = 0;

	bool		convert_alignment = false;
	bool		convert_numerics = false;

	/*
	 * MPP-7372: If the AO table was created before the fix for this issue, it
	 * may contain tuples with misaligned bindings. Here we check if the
	 * stored memtuple is problematic and then create a clone of the tuple
	 * with properly aligned bindings to be used by the executor.
	 */
	if (formatversion < AORelationVersion_Aligned64bit &&
		memtuple_has_misaligned_attribute(mtup, pbind))
		convert_alignment = true;

	if (PG82NumericConversionNeeded(formatversion))
	{
		/*
		 * On first call, figure out which columns are numerics, or domains
		 * over numerics.
		 */
		if (executorReadBlock->numericAtts == NULL)
		{
			int			n;

			executorReadBlock->numericAtts = (int *) palloc(natts * sizeof(int));

			n = 0;
			for (i = 0; i < natts; i++)
			{
				Oid			typeoid;

				typeoid = getBaseType(TupleDescAttr(tupdesc, i)->atttypid);
				if (typeoid == NUMERICOID)
					executorReadBlock->numericAtts[n++] = i;
			}
			executorReadBlock->numNumericAtts = n;
		}

		/* If there were any numeric columns, we need to convert them. */
		if (executorReadBlock->numNumericAtts > 0)
			convert_numerics = true;
	}

	if (!convert_alignment && !convert_numerics)
	{
		/* No conversion required. Return the original tuple unmodified. */
		*shouldFree = false;
		return mtup;
	}

	/* Conversion is needed. */

	/* enlarge the arrays if needed */
	if (natts > nallocated)
	{
		if (values)
			pfree(values);
		if (isnull)
			pfree(isnull);
		values = (Datum *) MemoryContextAlloc(TopMemoryContext, natts * sizeof(Datum));
		isnull = (bool *) MemoryContextAlloc(TopMemoryContext, natts * sizeof(bool));
		nallocated = natts;
	}

	if (convert_alignment)
	{
		/* get attribute values form mis-aligned tuple */
		memtuple_deform_misaligned(mtup, pbind, values, isnull);
		/* Form a new, properly-aligned, tuple */
		newtuple = memtuple_form(pbind, values, isnull);
	}
	else
	{
		/*
		 * make a modifiable copy
		 */
		newtuple = memtuple_copy(mtup);
	}

	/*
	 * NOTE: we do this *after* creating the new tuple, so that we can modify
	 * the new, copied, tuple in-place.
	 */
	if (convert_numerics)
	{
		int			i;

		/*
		 * Get pointers to the datums within the tuple
		 */
		memtuple_deform(newtuple, pbind, values, isnull);

		for (i = 0; i < executorReadBlock->numNumericAtts; i++)
		{
			/*
			 * Before PostgreSQL 8.3, the n_weight and n_sign_dscale fields
			 * were the other way 'round. Swap them.
			 */
			Datum		datum;
			char	   *numericdata;
			uint16		tmp;

			if (isnull[executorReadBlock->numericAtts[i]])
				continue;

			datum = values[executorReadBlock->numericAtts[i]];
			numericdata = VARDATA_ANY(DatumGetPointer(datum));

			memcpy(&tmp, &numericdata[0], 2);
			memcpy(&numericdata[0], &numericdata[2], 2);
			memcpy(&numericdata[2], &tmp, 2);
		}
	}

	*shouldFree = true;
	return newtuple;
}


static void
AOExecutorReadBlockBindingInit(AppendOnlyExecutorReadBlock *executorReadBlock,
									   TupleTableSlot *slot)
{
	MemoryContext oldContext;
	/*
	 * MemTupleBinding should be created from the slot's tuple descriptor
	 * and not from the tuple descriptor in the relation.  These could be
	 * different.  One example is alter table rewrite.
	 */
	if (!executorReadBlock->mt_bind)
	{
		oldContext = MemoryContextSwitchTo(executorReadBlock->memoryContext);
		executorReadBlock->mt_bind = create_memtuple_binding(slot->tts_tupleDescriptor);
		MemoryContextSwitchTo(oldContext);
	}
}


static bool
AppendOnlyExecutorReadBlock_ProcessTuple(AppendOnlyExecutorReadBlock *executorReadBlock,
										 int64 rowNum,
										 MemTuple tuple,
										 int32 tupleLen,
										 int nkeys,
										 ScanKey key,
										 TupleTableSlot *slot)
{
	bool		valid = true;

	/* Assume for HeapKeyTestUsingSlot define. */
	ItemPointerData fake_ctid;
	AOTupleId  *aoTupleId = (AOTupleId *) &fake_ctid;
	int			formatVersion = executorReadBlock->storageRead->formatVersion;

	AORelationVersion_CheckValid(formatVersion);

	AOTupleIdInit(aoTupleId, executorReadBlock->segmentFileNum, rowNum);

	if (slot)
		AOExecutorReadBlockBindingInit(executorReadBlock, slot);

	/*
	 * Is it legal to call this function with NULL slot?  The
	 * HeapKeyTestUsingSlot call below assumes that the slot is not NULL.
	 */
	Assert (slot);
	{
		bool		shouldFree = false;

		Assert(executorReadBlock->mt_bind);

		/* If the tuple is not in the latest format, convert it */
		// GPDB_12_MERGE_FIXME: Is pg_upgrade from old versions still a thing? Can we drop this?
		if (formatVersion < AORelationVersion_GetLatest())
			tuple = upgrade_tuple(executorReadBlock, tuple, executorReadBlock->mt_bind, formatVersion, &shouldFree);

		ExecClearTuple(slot);
		memtuple_deform(tuple, executorReadBlock->mt_bind, slot->tts_values, slot->tts_isnull);
		slot->tts_tid = fake_ctid;

		if (shouldFree)
		{
			/*
			 * Store the converted memtuple in slot->data, so that it gets free'd
			 * automatically when it's no longer needed.
			 */
			Assert(TTS_IS_VIRTUAL(slot));
			VirtualTupleTableSlot *vslot = (VirtualTupleTableSlot *) slot;
			Assert(vslot->data == NULL);
			Assert(!TTS_SHOULDFREE(slot));

			slot->tts_flags |= TTS_FLAG_SHOULDFREE;
			vslot->data = (char *) tuple;
		}
		ExecStoreVirtualTuple(slot);
	}

	/* skip visibility test, all tuples are visible */

	if (key != NULL)
		HeapKeyTestUsingSlot(slot, nkeys, key, valid);

	elogif(Debug_appendonly_print_scan_tuple && valid, LOG,
		   "Append-only scan tuple for table '%s' "
		   "(AOTupleId %s, tuple length %d, memtuple length %d, block offset in file " INT64_FORMAT ")",
		   AppendOnlyStorageRead_RelationName(executorReadBlock->storageRead),
		   AOTupleIdToString(aoTupleId),
		   tupleLen,
		   memtuple_get_size(tuple),
		   executorReadBlock->headerOffsetInFile);

	return valid;
}

static bool
AppendOnlyExecutorReadBlock_ScanNextTuple(AppendOnlyExecutorReadBlock *executorReadBlock,
										  int nkeys,
										  ScanKey key,
										  TupleTableSlot *slot)
{
	MemTuple	tuple;

	Assert(slot);

	switch (executorReadBlock->executorBlockKind)
	{
		case AoExecutorBlockKind_VarBlock:

			/*
			 * get the next item (tuple) from the varblock
			 */
			while (true)
			{
				int			itemLen = 0;
				uint8	   *itemPtr;
				int64		rowNum;

				itemPtr = VarBlockReaderGetNextItemPtr(
													   &executorReadBlock->varBlockReader,
													   &itemLen);

				if (itemPtr == NULL)
				{
					/* no more items in the varblock, get new buffer */
					AppendOnlyExecutionReadBlock_FinishedScanBlock(
																   executorReadBlock);
					return false;
				}

				executorReadBlock->currentItemCount++;

				executorReadBlock->totalRowsScannned++;

				if (itemLen > 0)
				{
					tuple = (MemTuple) itemPtr;

					rowNum = executorReadBlock->blockFirstRowNum +
						executorReadBlock->currentItemCount - INT64CONST(1);

					if (AppendOnlyExecutorReadBlock_ProcessTuple(
																 executorReadBlock,
																 rowNum,
																 tuple,
																 itemLen,
																 nkeys,
																 key,
																 slot))
						return true;
				}

			}

			/* varblock sanity check */
			if (executorReadBlock->readerItemCount !=
				executorReadBlock->currentItemCount)
				elog(NOTICE, "Varblock mismatch: Reader count %d, found %d items\n",
					 executorReadBlock->readerItemCount,
					 executorReadBlock->currentItemCount);
			break;

		case AoExecutorBlockKind_SingleRow:
			{
				int32		singleRowLen;

				if (executorReadBlock->singleRow == NULL)
				{
					AppendOnlyExecutionReadBlock_FinishedScanBlock(
																   executorReadBlock);
					return false;
					/* Force fetching new block. */
				}

				Assert(executorReadBlock->singleRowLen != 0);

				tuple = (MemTuple) executorReadBlock->singleRow;
				singleRowLen = executorReadBlock->singleRowLen;

				/*
				 * Indicated used up for scan.
				 */
				executorReadBlock->singleRow = NULL;
				executorReadBlock->singleRowLen = 0;

				executorReadBlock->totalRowsScannned++;

				if (AppendOnlyExecutorReadBlock_ProcessTuple(
															 executorReadBlock,
															 executorReadBlock->blockFirstRowNum,
															 tuple,
															 singleRowLen,
															 nkeys,
															 key,
															 slot))
					return true;
			}
			break;

		default:
			elog(ERROR, "Unrecognized append-only executor block kind: %d",
				 executorReadBlock->executorBlockKind);
			break;
	}

	AppendOnlyExecutionReadBlock_FinishedScanBlock(
												   executorReadBlock);
	return false;
	/* No match. */
}

static bool
AppendOnlyExecutorReadBlock_FetchTuple(AppendOnlyExecutorReadBlock *executorReadBlock,
									   int64 rowNum,
									   int nkeys,
									   ScanKey key,
									   TupleTableSlot *slot)
{
	MemTuple	tuple;
	int			itemNum;

	Assert(rowNum >= executorReadBlock->blockFirstRowNum);
	Assert(rowNum <=
		   executorReadBlock->blockFirstRowNum +
		   executorReadBlock->rowCount - 1);

	/*
	 * Get 0-based index to tuple.
	 */
	itemNum =
		(int) (rowNum - executorReadBlock->blockFirstRowNum);

	switch (executorReadBlock->executorBlockKind)
	{
		case AoExecutorBlockKind_VarBlock:
			{
				uint8	   *itemPtr;
				int			itemLen;

				itemPtr = VarBlockReaderGetItemPtr(
												   &executorReadBlock->varBlockReader,
												   itemNum,
												   &itemLen);
				Assert(itemPtr != NULL);

				tuple = (MemTuple) itemPtr;

				if (AppendOnlyExecutorReadBlock_ProcessTuple(
															 executorReadBlock,
															 rowNum,
															 tuple,
															 itemLen,
															 nkeys,
															 key,
															 slot))
					return true;
			}
			break;

		case AoExecutorBlockKind_SingleRow:
			{
				Assert(itemNum == 0);
				Assert(executorReadBlock->singleRow != NULL);
				Assert(executorReadBlock->singleRowLen != 0);

				tuple = (MemTuple) executorReadBlock->singleRow;

				if (AppendOnlyExecutorReadBlock_ProcessTuple(
															 executorReadBlock,
															 rowNum,
															 tuple,
															 executorReadBlock->singleRowLen,
															 nkeys,
															 key,
															 slot))
					return true;
			}
			break;

		default:
			elog(ERROR, "Unrecognized append-only executor block kind: %d",
				 executorReadBlock->executorBlockKind);
			break;
	}

	return false;
	/* No match. */
}

/* ------------------------------------------------------------------------------ */

/*
 * You can think of this scan routine as get next "executor" AO block.
 */
static bool
getNextBlock(AppendOnlyScanDesc scan)
{
	if (scan->aos_need_new_segfile)
	{
		/*
		 * Need to open a new segment file.
		 */
		if (!SetNextFileSegForRead(scan))
			return false;
	}

	if (!AppendOnlyExecutorReadBlock_GetBlockInfo(
												  &scan->storageRead,
												  &scan->executorReadBlock))
	{
		if (scan->blockDirectory)
		{
			AppendOnlyBlockDirectory_End_forInsert(scan->blockDirectory);
		}

		/* done reading the file */
		CloseScannedFileSeg(scan);

		return false;
	}

	if (scan->blockDirectory)
	{
		AppendOnlyBlockDirectory_InsertEntry(
											 scan->blockDirectory, 0,
											 scan->executorReadBlock.blockFirstRowNum,
											 scan->executorReadBlock.headerOffsetInFile,
											 scan->executorReadBlock.rowCount,
											 false);
	}

	AppendOnlyExecutorReadBlock_GetContents(
											&scan->executorReadBlock);

	return true;
}

/* ----------------
 *		appendonlygettup - fetch next appendonly tuple
 *
 *		Initialize the scan if not already done; then advance to the next
 *		tuple in forward direction; return the next tuple in scan->aos_ctup,
 *		or set scan->aos_ctup.t_data = NULL if no more tuples.
 *
 * Note: the reason nkeys/key are passed separately, even though they are
 * kept in the scan descriptor, is that the caller may not want us to check
 * the scankeys.
 * ----------------
 */
static bool
appendonlygettup(AppendOnlyScanDesc scan,
				 ScanDirection dir pg_attribute_unused(),
				 int nkeys,
				 ScanKey key,
				 TupleTableSlot *slot)
{
	Assert(ScanDirectionIsForward(dir));
	Assert(scan->usableBlockSize > 0);

	bool		isSnapshotAny = (scan->snapshot == SnapshotAny);

	for (;;)
	{
		bool		found;

		if (scan->bufferDone)
		{
			/*
			 * Get the next block. We call this function until we successfully
			 * get a block to process, or finished reading all the data (all
			 * 'segment' files) for this relation.
			 */
			while (!getNextBlock(scan))
			{
				/* have we read all this relation's data. done! */
				if (scan->aos_done_all_segfiles)
					return false;
			}

			scan->bufferDone = false;
		}

		found = AppendOnlyExecutorReadBlock_ScanNextTuple(&scan->executorReadBlock,
														  nkeys,
														  key,
														  slot);
		if (found)
		{

			/*
			 * Need to get the Block Directory entry that covers the TID.
			 */
			AOTupleId  *aoTupleId = (AOTupleId *) &slot->tts_tid;

			if (!isSnapshotAny && !AppendOnlyVisimap_IsVisible(&scan->visibilityMap, aoTupleId))
			{
				/*
				 * The tuple is invisible.
				 * In `analyze`, we can simply return false
				 */
				if ((scan->rs_base.rs_flags & SO_TYPE_ANALYZE) != 0)
					return false;
			}
			else
			{
				/* The tuple is visible */
				return true;
			}
		}
		else
		{
			/* no more items in the varblock, get new buffer */
			scan->bufferDone = true;
		}
	}
}

static void
cancelLastBuffer(AppendOnlyInsertDesc aoInsertDesc)
{
	if (aoInsertDesc->nonCompressedData != NULL)
	{
		Assert(AppendOnlyStorageWrite_IsBufferAllocated(&aoInsertDesc->storageWrite));
		AppendOnlyStorageWrite_CancelLastBuffer(&aoInsertDesc->storageWrite);
		aoInsertDesc->nonCompressedData = NULL;
	}
	else
		Assert(!AppendOnlyStorageWrite_IsBufferAllocated(&aoInsertDesc->storageWrite));
}

static void
setupNextWriteBlock(AppendOnlyInsertDesc aoInsertDesc)
{
	Assert(aoInsertDesc->nonCompressedData == NULL);
	Assert(!AppendOnlyStorageWrite_IsBufferAllocated(&aoInsertDesc->storageWrite));

	/* Set the firstRowNum for the block */
	aoInsertDesc->blockFirstRowNum = aoInsertDesc->lastSequence + 1;
	AppendOnlyStorageWrite_SetFirstRowNum(&aoInsertDesc->storageWrite,
										  aoInsertDesc->blockFirstRowNum);

	aoInsertDesc->nonCompressedData =
		AppendOnlyStorageWrite_GetBuffer(
											&aoInsertDesc->storageWrite,
											AoHeaderKind_SmallContent);

	/*
		* Prepare our VarBlock for items.  Leave room for the Append-Only
		* Storage header.
		*/
	VarBlockMakerInit(&aoInsertDesc->varBlockMaker,
						aoInsertDesc->nonCompressedData,
						aoInsertDesc->maxDataLen,
						aoInsertDesc->tempSpace,
						aoInsertDesc->tempSpaceLen);

	aoInsertDesc->bufferCount++;
}

static void
finishWriteBlock(AppendOnlyInsertDesc aoInsertDesc)
{
	int			executorBlockKind;
	int			itemCount;
	int32		dataLen;

	executorBlockKind = AoExecutorBlockKind_VarBlock;
	/* Assume. */

	itemCount = VarBlockMakerItemCount(&aoInsertDesc->varBlockMaker);
	if (itemCount == 0)
	{
		/*
		 * "Cancel" the last block allocation, if one.
		 */
		cancelLastBuffer(aoInsertDesc);
		return;
	}

	dataLen = VarBlockMakerFinish(&aoInsertDesc->varBlockMaker);

	aoInsertDesc->varblockCount++;

	if (itemCount == 1)
	{
		dataLen = VarBlockCollapseToSingleItem(
												/* target */ aoInsertDesc->nonCompressedData,
												/* source */ aoInsertDesc->nonCompressedData,
												/* sourceLen */ dataLen);
		executorBlockKind = AoExecutorBlockKind_SingleRow;
	}

	aoInsertDesc->storageWrite.logicalBlockStartOffset =
		BufferedAppendNextBufferPosition(&(aoInsertDesc->storageWrite.bufferedAppend));

	AppendOnlyStorageWrite_FinishBuffer(
										&aoInsertDesc->storageWrite,
										dataLen,
										executorBlockKind,
										itemCount);
	aoInsertDesc->nonCompressedData = NULL;
	Assert(!AppendOnlyStorageWrite_IsBufferAllocated(&aoInsertDesc->storageWrite));

	elogif(Debug_appendonly_print_insert, LOG,
			"Append-only insert finished uncompressed block for table '%s' "
			"(length = %d, application specific %d, item count %d, block count " INT64_FORMAT ")",
			NameStr(aoInsertDesc->aoi_rel->rd_rel->relname),
			dataLen,
			executorBlockKind,
			itemCount,
			aoInsertDesc->bufferCount);

	/* Insert an entry to the block directory */
	AppendOnlyBlockDirectory_InsertEntry(
										 &aoInsertDesc->blockDirectory,
										 0,
										 aoInsertDesc->blockFirstRowNum,
										 AppendOnlyStorageWrite_LogicalBlockStartOffset(&aoInsertDesc->storageWrite),
										 itemCount,
										 false);

	Assert(aoInsertDesc->nonCompressedData == NULL);
	Assert(!AppendOnlyStorageWrite_IsBufferAllocated(&aoInsertDesc->storageWrite));
}

/* ----------------------------------------------------------------
 *					 append-only access method interface
 * ----------------------------------------------------------------
 */

/*
 * appendonly_beginrangescan_internal()
 *
 * Begins a scan over a subset of segment info files.
 *
 * Should only be called with valid seginfos for the given relation.
 * Should only be called with an aoentry based on the same snapshot.
 *
 * The ownership of the seginfos and aoentry are transferred to the scan descriptor.
 */
static AppendOnlyScanDesc
appendonly_beginrangescan_internal(Relation relation,
								   Snapshot snapshot,
								   Snapshot appendOnlyMetaDataSnapshot,
								   FileSegInfo **seginfo,
								   int segfile_count,
								   int nkeys,
								   ScanKey key,
								   ParallelTableScanDesc parallel_scan,
								   uint32 flags)
{
	AppendOnlyScanDesc scan;
	AppendOnlyStorageAttributes *attr;
	StringInfoData titleBuf;
	int32 blocksize;
	int32 safefswritesize;
	int16 compresslevel;
	bool checksum;
	NameData compresstype;

	GetAppendOnlyEntryAttributes(relation->rd_id, &blocksize, &safefswritesize, &compresslevel, &checksum, &compresstype);

	/*
	 * increment relation ref count while scanning relation
	 *
	 * This is just to make really sure the relcache entry won't go away while
	 * the scan has a pointer to it.  Caller should be holding the rel open
	 * anyway, so this is redundant in all normal scenarios...
	 */
	RelationIncrementReferenceCount(relation);

	/*
	 * allocate scan descriptor
	 */
	scan = (AppendOnlyScanDesc) palloc0(sizeof(AppendOnlyScanDescData));

	scan->rs_base.rs_rd = relation;
	scan->rs_base.rs_snapshot = snapshot;
	scan->rs_base.rs_nkeys = nkeys;
	scan->rs_base.rs_flags = flags;
	scan->rs_base.rs_parallel = parallel_scan;

	scan->aos_filenamepath_maxlen = AOSegmentFilePathNameLen(relation) + 1;
	scan->aos_filenamepath = (char *) palloc(scan->aos_filenamepath_maxlen);
	scan->aos_filenamepath[0] = '\0';
	scan->usableBlockSize = blocksize;
	scan->aos_rd = relation;
	scan->appendOnlyMetaDataSnapshot = appendOnlyMetaDataSnapshot;
	scan->snapshot = snapshot;
	scan->aos_nkeys = nkeys;
	scan->aoScanInitContext = CurrentMemoryContext;

	initStringInfo(&titleBuf);
	appendStringInfo(&titleBuf, "Scan of Append-Only Row-Oriented relation '%s'",
					 RelationGetRelationName(relation));
	scan->title = titleBuf.data;


	/*
	 * Fill in Append-Only Storage layer attributes.
	 */
	attr = &scan->storageAttributes;

	/*
	 * These attributes describe the AppendOnly format to be scanned.
	 */
	if (strcmp(NameStr(compresstype), "") == 0 ||
		pg_strcasecmp(NameStr(compresstype), "none") == 0)
	{
		attr->compress = false;
		attr->compressType = "none";
	}
	else
	{
		attr->compress = true;
		attr->compressType = pstrdup(NameStr(compresstype));
	}
	attr->compressLevel = compresslevel;
	attr->checksum = checksum;
	attr->safeFSWriteSize = safefswritesize;

	/* UNDONE: We are calling the static header length routine here. */
	scan->maxDataLen =
		scan->usableBlockSize -
		AppendOnlyStorageFormat_RegularHeaderLenNeeded(scan->storageAttributes.checksum);


	/*
	 * Get information about all the file segments we need to scan
	 */
	scan->aos_segfile_arr = seginfo;

	scan->aos_total_segfiles = segfile_count;

	/*
	 * we do this here instead of in initscan() because appendonly_rescan also
	 * calls initscan() and we don't want to allocate memory again
	 */
	if (nkeys > 0)
		scan->aos_key = (ScanKey) palloc(sizeof(ScanKeyData) * nkeys);
	else
		scan->aos_key = NULL;

	/* pgstat_initstats(relation); */
	initscan(scan, key);

	scan->blockDirectory = NULL;

	if (segfile_count > 0)
	{
		Oid			visimaprelid;
		Oid			visimapidxid;

		GetAppendOnlyEntryAuxOids(relation->rd_id, NULL,
								  NULL, NULL, NULL, &visimaprelid, &visimapidxid);

		AppendOnlyVisimap_Init(&scan->visibilityMap,
							   visimaprelid,
							   visimapidxid,
							   AccessShareLock,
							   appendOnlyMetaDataSnapshot);
	}
	return scan;
}

/*
 * appendonly_beginrangescan
 *
 * begins range-limited relation scan
 */
AppendOnlyScanDesc
appendonly_beginrangescan(Relation relation,
						  Snapshot snapshot,
						  Snapshot appendOnlyMetaDataSnapshot,
						  int *segfile_no_arr, int segfile_count,
						  int nkeys, ScanKey keys)
{
	/*
	 * Get the pg_appendonly information for this table
	 */

	FileSegInfo **seginfo = palloc0(sizeof(FileSegInfo *) * segfile_count);
	int			i;

	for (i = 0; i < segfile_count; i++)
	{
		seginfo[i] = GetFileSegInfo(relation, appendOnlyMetaDataSnapshot,
									segfile_no_arr[i], false);
	}
	return appendonly_beginrangescan_internal(relation,
											  snapshot,
											  appendOnlyMetaDataSnapshot,
											  seginfo,
											  segfile_count,
											  nkeys,
											  keys,
											  NULL,
											  0);
}

/* ----------------
 *		appendonly_beginscan	- begin relation scan
 * ----------------
 */
TableScanDesc
appendonly_beginscan(Relation relation,
					 Snapshot snapshot,
					 int nkeys, struct ScanKeyData *key,
					 ParallelTableScanDesc pscan,
					 uint32 flags)
{
	Snapshot	appendOnlyMetaDataSnapshot;
	int			segfile_count;
	FileSegInfo **seginfo;
	AppendOnlyScanDesc aoscan;

	appendOnlyMetaDataSnapshot = snapshot;
	if (appendOnlyMetaDataSnapshot == SnapshotAny)
	{
		/*
		 * The append-only meta data should never be fetched with
		 * SnapshotAny as bogus results are returned.
		 * We use SnapshotSelf for metadata, as regular MVCC snapshot can hide
		 * newly globally inserted tuples from global index build process.
		 */
		appendOnlyMetaDataSnapshot = SnapshotSelf;
	}

	/*
	 * Get the pg_appendonly information for this table
	 */
	seginfo = GetAllFileSegInfo(relation,
								appendOnlyMetaDataSnapshot, &segfile_count, NULL);

	aoscan = appendonly_beginrangescan_internal(relation,
												snapshot,
												appendOnlyMetaDataSnapshot,
												seginfo,
												segfile_count,
												nkeys,
												key,
												pscan,
												flags);

	return (TableScanDesc) aoscan;
}

/* ----------------
 *		appendonly_rescan		- restart a relation scan
 *
 *
 * TODO: instead of freeing resources here and reallocating them in initscan
 * over and over see which of them can be refactored into appendonly_beginscan
 * and persist there until endscan is finally reached. For now this will do.
 *
 * GPDB_12_MERGE_FIXME: what to do with the new flags?
 * ----------------
 */
void
appendonly_rescan(TableScanDesc scan, ScanKey key,
				  bool set_params, bool allow_strat,
				  bool allow_sync, bool allow_pagemode)
{
	AppendOnlyScanDesc aoscan = (AppendOnlyScanDesc) scan;

	CloseScannedFileSeg(aoscan);

	AppendOnlyStorageRead_FinishSession(&aoscan->storageRead);

	aoscan->initedStorageRoutines = false;

	AppendOnlyExecutorReadBlock_Finish(&aoscan->executorReadBlock);

	aoscan->aos_need_new_segfile = true;

	/*
	 * reinitialize scan descriptor
	 */
	initscan(aoscan, key);
}

/* ----------------
 *		appendonly_endscan	- end relation scan
 * ----------------
 */
void
appendonly_endscan(TableScanDesc scan)
{
	AppendOnlyScanDesc aoscan = (AppendOnlyScanDesc) scan;

	RelationDecrementReferenceCount(aoscan->aos_rd);

	if (aoscan->aos_key)
		pfree(aoscan->aos_key);

	if (aoscan->aos_segfile_arr)
	{
		for (int seginfo_no = 0; seginfo_no < aoscan->aos_total_segfiles; seginfo_no++)
		{
			pfree(aoscan->aos_segfile_arr[seginfo_no]);
		}

		pfree(aoscan->aos_segfile_arr);
	}

	CloseScannedFileSeg(aoscan);

	AppendOnlyStorageRead_FinishSession(&aoscan->storageRead);

	aoscan->initedStorageRoutines = false;

	AppendOnlyExecutorReadBlock_Finish(&aoscan->executorReadBlock);

	if (aoscan->aos_total_segfiles > 0)
		AppendOnlyVisimap_Finish(&aoscan->visibilityMap, AccessShareLock);

	if (aoscan->aofetch)
	{
		appendonly_fetch_finish(aoscan->aofetch);
		pfree(aoscan->aofetch);
		aoscan->aofetch = NULL;
	}

	pfree(aoscan->aos_filenamepath);

	pfree(aoscan->title);

	pfree(aoscan);
}

/* ----------------
 *		appendonly_getnextslot - retrieve next tuple in scan
 * ----------------
 */
bool
appendonly_getnextslot(TableScanDesc scan, ScanDirection direction, TupleTableSlot *slot)
{
	AppendOnlyScanDesc aoscan = (AppendOnlyScanDesc) scan;

	if (appendonlygettup(aoscan, direction, aoscan->rs_base.rs_nkeys, aoscan->aos_key, slot))
	{
		pgstat_count_heap_getnext(aoscan->aos_rd);

		return true;
	}
	else
	{
		if (slot)
			ExecClearTuple(slot);

		return false;
	}
}

static void
closeFetchSegmentFile(AppendOnlyFetchDesc aoFetchDesc)
{
	Assert(aoFetchDesc->currentSegmentFile.isOpen);

	AppendOnlyStorageRead_CloseFile(&aoFetchDesc->storageRead);

	aoFetchDesc->currentSegmentFile.isOpen = false;
}

static bool
openFetchSegmentFile(AppendOnlyFetchDesc aoFetchDesc,
					 int openSegmentFileNum)
{
	int			i;

	FileSegInfo *fsInfo;
	int			segmentFileNum;
	int64		logicalEof;
	int32		fileSegNo;

	Assert(!aoFetchDesc->currentSegmentFile.isOpen);

	i = 0;
	while (true)
	{
		if (i >= aoFetchDesc->totalSegfiles)
			return false;
		/* Segment file not visible in catalog information. */

		fsInfo = aoFetchDesc->segmentFileInfo[i];
		segmentFileNum = fsInfo->segno;
		if (openSegmentFileNum == segmentFileNum)
		{
			if (fsInfo->state == AOSEG_STATE_AWAITING_DROP)
			{
				/*
				 * File compacted, but not dropped. All information are
				 * declared invisible
				 */
				return false;
			}
			logicalEof = (int64) fsInfo->eof;
			break;
		}
		i++;
	}

	/*
	 * Don't try to open a segment file when its EOF is 0, since the file may
	 * not exist. See MPP-8280.
	 */
	if (logicalEof == 0)
		return false;

	MakeAOSegmentFileName(
						  aoFetchDesc->relation,
						  openSegmentFileNum, -1,
						  &fileSegNo,
						  aoFetchDesc->segmentFileName);
	Assert(strlen(aoFetchDesc->segmentFileName) + 1 <=
		   aoFetchDesc->segmentFileNameMaxLen);

	/* UNDONE: Appropriate to use Try here? */
	if (!AppendOnlyStorageRead_TryOpenFile(
										   &aoFetchDesc->storageRead,
										   aoFetchDesc->segmentFileName,
										   fsInfo->formatversion,
										   logicalEof))
		return false;

	aoFetchDesc->currentSegmentFile.num = openSegmentFileNum;
	aoFetchDesc->currentSegmentFile.logicalEof = logicalEof;

	aoFetchDesc->currentSegmentFile.isOpen = true;

	return true;
}

static bool
fetchNextBlock(AppendOnlyFetchDesc aoFetchDesc)
{
	AppendOnlyExecutorReadBlock *executorReadBlock =
	&aoFetchDesc->executorReadBlock;

	/*
	 * Try to read next block.
	 */
	if (!AppendOnlyExecutorReadBlock_GetBlockInfo(
												  &aoFetchDesc->storageRead,
												  &aoFetchDesc->executorReadBlock))
		return false;
	/* Hit end of range. */

	/*
	 * Unpack information into member variables.
	 */
	aoFetchDesc->currentBlock.valid = true;
	aoFetchDesc->currentBlock.fileOffset =
		executorReadBlock->headerOffsetInFile;
	aoFetchDesc->currentBlock.overallBlockLen =
		AppendOnlyStorageRead_OverallBlockLen(
											  &aoFetchDesc->storageRead);
	aoFetchDesc->currentBlock.firstRowNum =
		executorReadBlock->blockFirstRowNum;
	aoFetchDesc->currentBlock.lastRowNum =
		executorReadBlock->blockFirstRowNum +
		executorReadBlock->rowCount - 1;

	aoFetchDesc->currentBlock.isCompressed =
		executorReadBlock->isCompressed;
	aoFetchDesc->currentBlock.isLargeContent =
		executorReadBlock->isLarge;

	aoFetchDesc->currentBlock.gotContents = false;

	return true;
}

static bool
fetchFromCurrentBlock(AppendOnlyFetchDesc aoFetchDesc,
					  int64 rowNum,
					  TupleTableSlot *slot)
{
	Assert(aoFetchDesc->currentBlock.valid);
	Assert(rowNum >= aoFetchDesc->currentBlock.firstRowNum);
	Assert(rowNum <= aoFetchDesc->currentBlock.lastRowNum);

	if (!aoFetchDesc->currentBlock.gotContents)
	{
		/*
		 * Do decompression if necessary and get contents.
		 */
		AppendOnlyExecutorReadBlock_GetContents(&aoFetchDesc->executorReadBlock);

		aoFetchDesc->currentBlock.gotContents = true;
	}

	return AppendOnlyExecutorReadBlock_FetchTuple(&aoFetchDesc->executorReadBlock,
												  rowNum,
												   /* nkeys */ 0,
												   /* key */ NULL,
												  slot);
}

static void
positionFirstBlockOfRange(AppendOnlyFetchDesc aoFetchDesc)
{
	AppendOnlyBlockDirectoryEntry_GetBeginRange(&aoFetchDesc->currentBlock.blockDirectoryEntry,
												&aoFetchDesc->scanNextFileOffset,
												&aoFetchDesc->scanNextRowNum);
}

static void
positionLimitToEndOfRange(AppendOnlyFetchDesc aoFetchDesc)
{
	AppendOnlyBlockDirectoryEntry_GetEndRange(&aoFetchDesc->currentBlock.blockDirectoryEntry,
											  &aoFetchDesc->scanAfterFileOffset,
											  &aoFetchDesc->scanLastRowNum);
}


static void
positionSkipCurrentBlock(AppendOnlyFetchDesc aoFetchDesc)
{
	aoFetchDesc->scanNextFileOffset =
		aoFetchDesc->currentBlock.fileOffset +
		aoFetchDesc->currentBlock.overallBlockLen;

	aoFetchDesc->scanNextRowNum = aoFetchDesc->currentBlock.lastRowNum + 1;
}

/*
 * Scan through blocks to find row.
 *
 * If row is not represented in any of the blocks covered by the Block Directory, then the row
 * falls into a row gap.  The row must have been aborted or deleted and reclaimed.
 */
static bool
scanToFetchTuple(AppendOnlyFetchDesc aoFetchDesc,
				 int64 rowNum,
				 TupleTableSlot *slot)
{
	if (aoFetchDesc->scanNextFileOffset >=
		aoFetchDesc->scanAfterFileOffset)
		return false;
	/* No more blocks requested for range. */

	if (aoFetchDesc->currentSegmentFile.logicalEof ==
		aoFetchDesc->scanNextFileOffset)
		return false;
	/* No more blocks in this file. */

	if (aoFetchDesc->currentSegmentFile.logicalEof <
		aoFetchDesc->scanNextFileOffset)
		return false;
/* UNDONE:Why does our next scan position go beyond logical EOF ? */

	/*
	 * Temporarily restrict our reading to just the range.
	 */
	AppendOnlyStorageRead_SetTemporaryRange(
											&aoFetchDesc->storageRead,
											aoFetchDesc->scanNextFileOffset,
											aoFetchDesc->scanAfterFileOffset);
	AppendOnlyExecutionReadBlock_SetSegmentFileNum(
												   &aoFetchDesc->executorReadBlock,
												   aoFetchDesc->currentSegmentFile.num);
	AppendOnlyExecutionReadBlock_SetPositionInfo(
												 &aoFetchDesc->executorReadBlock,
												 aoFetchDesc->scanNextRowNum);

	aoFetchDesc->skipBlockCount = 0;
	while (true)
	{
		/*
		 * Fetch block starting at scanNextFileOffset.
		 */
		if (!fetchNextBlock(aoFetchDesc))
			return false;
		/* No more blocks. */

		/*
		 * Examine new current block header information.
		 */
		if (rowNum < aoFetchDesc->currentBlock.firstRowNum)
		{
			/*
			 * Since we have read a new block, the temporary range for the
			 * read needs to be adjusted accordingly. Otherwise, the
			 * underlying bufferedRead may stop reading more data because of
			 * the previously-set smaller temporary range.
			 */
			int64		beginFileOffset = aoFetchDesc->currentBlock.fileOffset;
			int64		afterFileOffset = aoFetchDesc->currentBlock.fileOffset +
			aoFetchDesc->currentBlock.overallBlockLen;

			AppendOnlyStorageRead_SetTemporaryRange(
													&aoFetchDesc->storageRead,
													beginFileOffset,
													afterFileOffset);

			return false;
			/* Row fell in gap between blocks. */
		}

		if (rowNum <= aoFetchDesc->currentBlock.lastRowNum)
			return fetchFromCurrentBlock(aoFetchDesc, rowNum, slot);

		/*
		 * Update information to get next block.
		 */
		Assert(!aoFetchDesc->currentBlock.gotContents);

		/* MPP-17061: reach the end of range covered by block directory entry */
		if ((aoFetchDesc->currentBlock.fileOffset +
			 aoFetchDesc->currentBlock.overallBlockLen) >=
			aoFetchDesc->scanAfterFileOffset)
		{
			return false;
		}

		AppendOnlyExecutionReadBlock_FinishedScanBlock(
													   &aoFetchDesc->executorReadBlock);

		AppendOnlyStorageRead_SkipCurrentBlock(
											   &aoFetchDesc->storageRead);
		aoFetchDesc->skipBlockCount++;
	}
}

static void
resetCurrentBlockInfo(AOFetchBlockMetadata * currentBlock)
{
	currentBlock->valid = false;
	currentBlock->firstRowNum = 0;
	currentBlock->lastRowNum = 0;
}

AppendOnlyFetchDesc
appendonly_fetch_init(Relation relation,
					  Snapshot snapshot,
					  Snapshot appendOnlyMetaDataSnapshot)
{
	AppendOnlyFetchDesc				aoFetchDesc;
	AppendOnlyStorageAttributes	   *attr;
	PGFunction					   *fns;
	StringInfoData					titleBuf;
	FormData_pg_appendonly			aoFormData;
	int								segno;

	GetAppendOnlyEntry(relation->rd_id, &aoFormData);

	/*
	 * increment relation ref count while scanning relation
	 *
	 * This is just to make really sure the relcache entry won't go away while
	 * the scan has a pointer to it.  Caller should be holding the rel open
	 * anyway, so this is redundant in all normal scenarios...
	 */
	RelationIncrementReferenceCount(relation);

	/*
	 * allocate scan descriptor
	 */
	aoFetchDesc = (AppendOnlyFetchDesc) palloc0(sizeof(AppendOnlyFetchDescData));

	aoFetchDesc->relation = relation;
	aoFetchDesc->appendOnlyMetaDataSnapshot = appendOnlyMetaDataSnapshot;
	aoFetchDesc->snapshot = snapshot;

	aoFetchDesc->initContext = CurrentMemoryContext;

	aoFetchDesc->segmentFileNameMaxLen = AOSegmentFilePathNameLen(relation) + 1;
	aoFetchDesc->segmentFileName =
		(char *) palloc(aoFetchDesc->segmentFileNameMaxLen);
	aoFetchDesc->segmentFileName[0] = '\0';

	initStringInfo(&titleBuf);
	appendStringInfo(&titleBuf, "Fetch of Append-Only Row-Oriented relation '%s'",
					 RelationGetRelationName(relation));
	aoFetchDesc->title = titleBuf.data;

	/*
	 * Fill in Append-Only Storage layer attributes.
	 */
	attr = &aoFetchDesc->storageAttributes;

	/*
	 * These attributes describe the AppendOnly format to be scanned.
	 */
	if (strcmp(NameStr(aoFormData.compresstype), "") == 0 ||
		pg_strcasecmp(NameStr(aoFormData.compresstype), "none") == 0)
	{
		attr->compress = false;
		attr->compressType = "none";
	}
	else
	{
		attr->compress = true;
		attr->compressType = NameStr(aoFormData.compresstype);
	}
	attr->compressLevel = aoFormData.compresslevel;
	attr->checksum = aoFormData.checksum;
	attr->safeFSWriteSize = aoFormData.safefswritesize;
	aoFetchDesc->usableBlockSize = aoFormData.blocksize;

	/*
	 * Get information about all the file segments we need to scan
	 */
	aoFetchDesc->segmentFileInfo =
		GetAllFileSegInfo(
						  relation,
						  appendOnlyMetaDataSnapshot,
						  &aoFetchDesc->totalSegfiles,
						  NULL);

	/* 
	 * Initialize lastSequence only for segments which we got above is sufficient,
	 * rather than all AOTupleId_MultiplierSegmentFileNum ones that introducing
	 * too many unnecessary calls in most cases.
	 */
	memset(aoFetchDesc->lastSequence, -1, sizeof(aoFetchDesc->lastSequence));
	for (int i = -1; i < aoFetchDesc->totalSegfiles; i++)
	{
		/* always initailize segment 0 */
		segno = (i < 0 ? 0 : aoFetchDesc->segmentFileInfo[i]->segno);
		/* set corresponding bit for target segment */
		aoFetchDesc->lastSequence[segno] = ReadLastSequence(aoFormData.segrelid, segno);
	}

	AppendOnlyStorageRead_Init(
							   &aoFetchDesc->storageRead,
							   aoFetchDesc->initContext,
							   aoFetchDesc->usableBlockSize,
							   NameStr(aoFetchDesc->relation->rd_rel->relname),
							   aoFetchDesc->title,
							   &aoFetchDesc->storageAttributes);


	fns = get_funcs_for_compression(NameStr(aoFormData.compresstype));
	aoFetchDesc->storageRead.compression_functions = fns;

	if (fns)
	{
		PGFunction	cons = fns[COMPRESSION_CONSTRUCTOR];
		CompressionState *cs;
		StorageAttributes sa;

		sa.comptype = NameStr(aoFormData.compresstype);
		sa.complevel = aoFormData.compresslevel;
		sa.blocksize = aoFormData.blocksize;


		cs = callCompressionConstructor(cons, RelationGetDescr(relation),
										&sa,
										false /* decompress */ );
		aoFetchDesc->storageRead.compressionState = cs;
	}

	AppendOnlyExecutorReadBlock_Init(
									 &aoFetchDesc->executorReadBlock,
									 aoFetchDesc->relation,
									 aoFetchDesc->initContext,
									 &aoFetchDesc->storageRead,
									 aoFetchDesc->usableBlockSize);

	AppendOnlyBlockDirectory_Init_forSearch(
											&aoFetchDesc->blockDirectory,
											appendOnlyMetaDataSnapshot,
											aoFetchDesc->segmentFileInfo,
											aoFetchDesc->totalSegfiles,
											aoFetchDesc->relation,
											1,
											false,
											NULL);

	AppendOnlyVisimap_Init(&aoFetchDesc->visibilityMap,
						   aoFormData.visimaprelid,
						   aoFormData.visimapidxid,
						   AccessShareLock,
						   appendOnlyMetaDataSnapshot);

	return aoFetchDesc;

}

/*
 * appendonly_fetch -- fetch the tuple for a given tid.
 *
 * If the 'slot' is not NULL, the fetched tuple will be assigned to the slot.
 *
 * Return true if such a tuple is found. Otherwise, return false.
 */
bool
appendonly_fetch(AppendOnlyFetchDesc aoFetchDesc,
				 AOTupleId *aoTupleId,
				 TupleTableSlot *slot)
{
	int			segmentFileNum = AOTupleIdGet_segmentFileNum(aoTupleId);
	int64		rowNum = AOTupleIdGet_rowNum(aoTupleId);
	bool		isSnapshotAny = (aoFetchDesc->snapshot == SnapshotAny);

	Assert(segmentFileNum >= 0);

	if (aoFetchDesc->lastSequence[segmentFileNum] == InvalidAORowNum)
		ereport(ERROR,
				(errcode(ERRCODE_INTERNAL_ERROR),
				 errmsg("Row No. %ld in segment file No. %d is out of scanning scope for target relfilenode %u.",
				 		rowNum, segmentFileNum, aoFetchDesc->relation->rd_node.relNode)));

	/*
	 * This is an improvement for brin. BRIN index stores ranges of TIDs in
	 * terms of block numbers and not specific TIDs, so it's possible that the
	 * fetch function is called with a non-existent TID. The function
	 * appendonly_fetch will access the block directory table first and cache
	 * some MinipageEntrys. If we try to access the non-existent tid, a cache
	 * miss will occur. And we need to search the btree on block directory
	 * table. This is a vary slow operation. So a fast return path was added
	 * here. If the rowNum is bigger than lastsequence, skip it.
	 */
	if (rowNum > aoFetchDesc->lastSequence[segmentFileNum])
	{
		if (slot != NULL)
		{
			ExecClearTuple(slot);
		}
		return false;	/* row has been deleted or updated. */
	}

	/*
	 * Do we have a current block?  If it has the requested tuple, that would
	 * be a great performance optimization.
	 */
	if (aoFetchDesc->currentBlock.valid)
	{
		if (aoFetchDesc->currentSegmentFile.isOpen &&
			segmentFileNum == aoFetchDesc->currentSegmentFile.num &&
			segmentFileNum == aoFetchDesc->blockDirectory.currentSegmentFileNum &&
			segmentFileNum == aoFetchDesc->executorReadBlock.segmentFileNum)
		{
			if (rowNum >= aoFetchDesc->currentBlock.firstRowNum &&
				rowNum <= aoFetchDesc->currentBlock.lastRowNum &&
				AppendOnlyBlockDirectoryEntry_RangeHasRow(
														  &aoFetchDesc->currentBlock.blockDirectoryEntry,
														  rowNum))
			{
				if (!isSnapshotAny && !AppendOnlyVisimap_IsVisible(&aoFetchDesc->visibilityMap, aoTupleId))
				{
					if (slot != NULL)
					{
						ExecClearTuple(slot);
					}
					return false;	/* row has been deleted or updated. */
				}
				return fetchFromCurrentBlock(aoFetchDesc, rowNum, slot);
			}

			/*
			 * Otherwise, if the current Block Directory entry covers the
			 * request tuples, lets use its information as another performance
			 * optimization.
			 */
			if (AppendOnlyBlockDirectoryEntry_RangeHasRow(
														  &aoFetchDesc->currentBlock.blockDirectoryEntry,
														  rowNum))
			{
				/*
				 * The tuple is covered by the current Block Directory entry,
				 * but is it before or after our current block?
				 */
				if (rowNum < aoFetchDesc->currentBlock.firstRowNum)
				{
					/*
					 * XXX This could happen when an insert is cancelled. In
					 * that case, we fetched the next block that has a higher
					 * firstRowNum when we try to find the first cancelled
					 * row. So for the second or any cancelled row, we enter
					 * here, and re-read the previous block. This seems
					 * inefficient.
					 *
					 * We may be able to fix this by adding an entry to the
					 * block directory for those cancelled inserts.
					 */

					/*
					 * Set scan range to prior blocks.
					 */
					positionFirstBlockOfRange(aoFetchDesc);

					/* Set limit to before current block. */
					aoFetchDesc->scanAfterFileOffset =
						aoFetchDesc->currentBlock.fileOffset;

					aoFetchDesc->scanLastRowNum =
						aoFetchDesc->currentBlock.firstRowNum - 1;
				}
				else
				{
					/*
					 * Set scan range to following blocks.
					 */
					positionSkipCurrentBlock(aoFetchDesc);

					positionLimitToEndOfRange(aoFetchDesc);
				}

				if (!isSnapshotAny && !AppendOnlyVisimap_IsVisible(&aoFetchDesc->visibilityMap, aoTupleId))
				{
					if (slot != NULL)
					{
						ExecClearTuple(slot);
					}
					return false;	/* row has been deleted or updated. */
				}

				if (scanToFetchTuple(aoFetchDesc, rowNum, slot))
					return true;

				if (slot != NULL)
					ExecClearTuple(slot);
				return false;
				/* Segment file not in aoseg table.. */
			}
		}
	}

	/*
	 * Open or switch open, if necessary.
	 */
	if (aoFetchDesc->currentSegmentFile.isOpen &&
		segmentFileNum != aoFetchDesc->currentSegmentFile.num)
	{
#ifdef USE_ASSERT_CHECKING
		/*
		 * Currently, we only support Index Scan on bitmap index and Bitmap Index Scan
		 * on AO tables, so normally the below warning should not happen.
		 * See get_index_paths in indxpath.c.
		 */
		if (segmentFileNum < aoFetchDesc->currentSegmentFile.num)
			ereport(WARNING,
					(errmsg("append-only fetch requires scan prior segment file: segmentFileNum %d, rowNum " INT64_FORMAT ", currentSegmentFileNum %d",
							segmentFileNum, rowNum,
							aoFetchDesc->currentSegmentFile.num)));
#endif
		closeFetchSegmentFile(aoFetchDesc);

		Assert(!aoFetchDesc->currentSegmentFile.isOpen);
	}

	if (!aoFetchDesc->currentSegmentFile.isOpen)
	{
		if (!openFetchSegmentFile(
								  aoFetchDesc,
								  segmentFileNum))
		{
			if (slot != NULL)
				ExecClearTuple(slot);
			return false;
			/* Segment file not in aoseg table.. */
			/* Must be aborted or deleted and reclaimed. */
		}

		/* Reset currentBlock info */
		resetCurrentBlockInfo(&(aoFetchDesc->currentBlock));
	}

	/*
	 * Need to get the Block Directory entry that covers the TID.
	 */
	if (!AppendOnlyBlockDirectory_GetEntry(
										   &aoFetchDesc->blockDirectory,
										   aoTupleId,
										   0,
										   &aoFetchDesc->currentBlock.blockDirectoryEntry))
	{
		if (slot != NULL)
		{
			ExecClearTuple(slot);
		}
		return false;			/* Row not represented in Block Directory. */
		/* Must be aborted or deleted and reclaimed. */
	}

	if (!isSnapshotAny && !AppendOnlyVisimap_IsVisible(&aoFetchDesc->visibilityMap, aoTupleId))
	{
		if (slot != NULL)
		{
			ExecClearTuple(slot);
		}
		return false;			/* row has been deleted or updated. */
	}


	/*
	 * Set scan range covered by new Block Directory entry.
	 */
	positionFirstBlockOfRange(aoFetchDesc);

	positionLimitToEndOfRange(aoFetchDesc);

	if (scanToFetchTuple(aoFetchDesc, rowNum, slot))
		return true;

	if (slot != NULL)
		ExecClearTuple(slot);
	return false;
	/* Segment file not in aoseg table.. */
}

void
appendonly_fetch_finish(AppendOnlyFetchDesc aoFetchDesc)
{
	RelationDecrementReferenceCount(aoFetchDesc->relation);

	AppendOnlyStorageRead_CloseFile(&aoFetchDesc->storageRead);

	AppendOnlyStorageRead_FinishSession(&aoFetchDesc->storageRead);

	AppendOnlyExecutorReadBlock_Finish(&aoFetchDesc->executorReadBlock);

	AppendOnlyBlockDirectory_End_forSearch(&aoFetchDesc->blockDirectory);

	if (aoFetchDesc->segmentFileInfo)
	{
		FreeAllSegFileInfo(aoFetchDesc->segmentFileInfo, aoFetchDesc->totalSegfiles);
		pfree(aoFetchDesc->segmentFileInfo);
		aoFetchDesc->segmentFileInfo = NULL;
	}

	AppendOnlyVisimap_Finish(&aoFetchDesc->visibilityMap, AccessShareLock);

	pfree(aoFetchDesc->segmentFileName);
	aoFetchDesc->segmentFileName = NULL;

	pfree(aoFetchDesc->title);
}

/*
 * appendonly_delete_init
 *
 * before using appendonly_delete() to delete tuples from append-only segment
 * files, we need to call this function to initialize the delete desc
 * data structured.
 */
AppendOnlyDeleteDesc
appendonly_delete_init(Relation rel)
{
	Assert(!IsolationUsesXactSnapshot());

	Oid visimaprelid;
	Oid visimapidxid;

	GetAppendOnlyEntryAuxOids(rel->rd_id, NULL, NULL, NULL, NULL, &visimaprelid, &visimapidxid);

	AppendOnlyDeleteDesc aoDeleteDesc = palloc0(sizeof(AppendOnlyDeleteDescData));

	aoDeleteDesc->aod_rel = rel;
	aoDeleteDesc->appendOnlyMetaDataSnapshot = GetActiveSnapshot();

	AppendOnlyVisimap_Init(&aoDeleteDesc->visibilityMap,
						   visimaprelid,
						   visimapidxid,
						   RowExclusiveLock,
						   aoDeleteDesc->appendOnlyMetaDataSnapshot);

	AppendOnlyVisimapDelete_Init(&aoDeleteDesc->visiMapDelete,
								 &aoDeleteDesc->visibilityMap);

	return aoDeleteDesc;
}

void
appendonly_delete_finish(AppendOnlyDeleteDesc aoDeleteDesc)
{
	Assert(aoDeleteDesc);

	AppendOnlyVisimapDelete_Finish(&aoDeleteDesc->visiMapDelete);

	AppendOnlyVisimap_Finish(&aoDeleteDesc->visibilityMap, NoLock);

	pfree(aoDeleteDesc);
}

TM_Result
appendonly_delete(AppendOnlyDeleteDesc aoDeleteDesc,
				  AOTupleId *aoTupleId)
{
	Assert(aoDeleteDesc);
	Assert(aoTupleId);

	elogif(Debug_appendonly_print_delete, LOG,
		   "Append-only delete tuple from table '%s' (AOTupleId %s)",
		   NameStr(aoDeleteDesc->aod_rel->rd_rel->relname),
		   AOTupleIdToString(aoTupleId));

#ifdef FAULT_INJECTOR
	FaultInjector_InjectFaultIfSet(
								   "appendonly_delete",
								   DDLNotSpecified,
								   "", //databaseName
								   RelationGetRelationName(aoDeleteDesc->aod_rel));
	/* tableName */
#endif

	return AppendOnlyVisimapDelete_Hide(&aoDeleteDesc->visiMapDelete, aoTupleId);
}

/*
 * appendonly_insert_init
 *
 * 'segno' must be a segment that has been previously locked for this
 * transaction, by calling LockSegnoForWrite() or ChooseSegnoForWrite().
 *
 * before using appendonly_insert() to insert tuples we need to call
 * this function to initialize our varblock and bufferedAppend structures
 * and memory for appending data into the relation file.
 *
 * see appendonly_insert() for more specifics about inserting tuples into
 * append only tables.
 */
AppendOnlyInsertDesc
appendonly_insert_init(Relation rel, int segno)
{
	AppendOnlyInsertDesc aoInsertDesc;
	int			maxtupsize;
	int64		firstSequence = 0;
	PGFunction *fns;
	int			desiredOverflowBytes = 0;
	size_t		(*desiredCompressionSize) (size_t input);

	AppendOnlyStorageAttributes *attr;

	StringInfoData titleBuf;
	Oid segrelid;
	int32 blocksize;
	int32 safefswritesize;
	int16 compresslevel;
	bool checksum;
	NameData compresstype;

	GetAppendOnlyEntryAttributes(rel->rd_id, &blocksize, &safefswritesize, &compresslevel, &checksum, &compresstype);

	/*
	 * Get the pg_appendonly information for this table
	 */

	/*
	 * allocate and initialize the insert descriptor
	 */
	aoInsertDesc = (AppendOnlyInsertDesc) palloc0(sizeof(AppendOnlyInsertDescData));

	aoInsertDesc->aoi_rel = rel;

	/*
	 * We want to see an up-to-date view of the metadata. The target segment's
	 * pg_aoseg row is already locked for us.
	 */
	aoInsertDesc->appendOnlyMetaDataSnapshot = RegisterSnapshot(GetCatalogSnapshot(InvalidOid));

	aoInsertDesc->mt_bind = create_memtuple_binding(RelationGetDescr(rel));

	aoInsertDesc->appendFile = -1;
	aoInsertDesc->appendFilePathNameMaxLen = AOSegmentFilePathNameLen(rel) + 1;
	aoInsertDesc->appendFilePathName = (char *) palloc(aoInsertDesc->appendFilePathNameMaxLen);
	aoInsertDesc->appendFilePathName[0] = '\0';

	aoInsertDesc->bufferCount = 0;
	aoInsertDesc->blockFirstRowNum = 0;
	aoInsertDesc->insertCount = 0;
	aoInsertDesc->varblockCount = 0;
	aoInsertDesc->rowCount = 0;

	Assert(segno >= 0);
	aoInsertDesc->cur_segno = segno;

	/*
	 * Adding a NOTOAST table attribute in 3.3.3 would require a catalog
	 * change, so in the interim we will test this with a GUC.
	 *
	 * This GUC must have the same value on write and read.
	 */
/* 	aoInsertDesc->useNoToast = aoentry->notoast; */

	/*
	 * Although variable length blocks of AO should be able to accommodate variable length
	 * datums, we still need to keep TOAST for AO_ROW to benefit to performance when query
	 * in-line data.
	 */
	aoInsertDesc->useNoToast = !(rel->rd_tableam->relation_needs_toast_table(rel));

	aoInsertDesc->usableBlockSize = blocksize;

	attr = &aoInsertDesc->storageAttributes;

	/*
	 * These attributes describe the AppendOnly format to be scanned.
	 */
	if (strcmp(NameStr(compresstype), "") == 0 ||
		pg_strcasecmp(NameStr(compresstype), "none") == 0)
	{
		attr->compress = false;
		attr->compressType = "none";
	}
	else
	{
		attr->compress = true;
		attr->compressType = NameStr(compresstype);
	}
	attr->compressLevel = compresslevel;
	attr->checksum = checksum;
	attr->safeFSWriteSize = safefswritesize;

	fns = get_funcs_for_compression(NameStr(compresstype));

	CompressionState *cs = NULL;
	CompressionState *verifyCs = NULL;

	if (fns)
	{
		PGFunction	cons = fns[COMPRESSION_CONSTRUCTOR];
		StorageAttributes sa;

		sa.comptype = NameStr(compresstype);
		sa.complevel = compresslevel;
		sa.blocksize = blocksize;

		cs = callCompressionConstructor(cons, RelationGetDescr(rel),
										&sa,
										true /* compress */ );
		if (gp_appendonly_verify_write_block == true)
		{
			verifyCs = callCompressionConstructor(cons, RelationGetDescr(rel),
												  &sa,
												  false /* decompress */ );
		}

		desiredCompressionSize = cs->desired_sz;
		if (desiredCompressionSize != NULL)
		{
			/*
			 * Call the compression's desired size function to find out what
			 * additional space it requires for our block size.
			 */
			desiredOverflowBytes =
				(int) (desiredCompressionSize) (aoInsertDesc->usableBlockSize)
				- aoInsertDesc->usableBlockSize;
			Assert(desiredOverflowBytes >= 0);
		}
	}

	aoInsertDesc->storageAttributes.overflowSize = desiredOverflowBytes;

	initStringInfo(&titleBuf);
	appendStringInfo(&titleBuf, "Write of Append-Only Row-Oriented relation '%s'",
					 RelationGetRelationName(aoInsertDesc->aoi_rel));
	aoInsertDesc->title = titleBuf.data;

	AppendOnlyStorageWrite_Init(
								&aoInsertDesc->storageWrite,
								NULL,
								aoInsertDesc->usableBlockSize,
								RelationGetRelationName(aoInsertDesc->aoi_rel),
								aoInsertDesc->title,
								&aoInsertDesc->storageAttributes,
								XLogIsNeeded() && RelationNeedsWAL(aoInsertDesc->aoi_rel));

	aoInsertDesc->storageWrite.compression_functions = fns;
	aoInsertDesc->storageWrite.compressionState = cs;
	aoInsertDesc->storageWrite.verifyWriteCompressionState = verifyCs;

	elogif(Debug_appendonly_print_insert, LOG,
		   "Append-only insert initialize for table '%s' segment file %u "
		   "(compression = %s, compression type %s, compression level %d)",
		   NameStr(aoInsertDesc->aoi_rel->rd_rel->relname),
		   aoInsertDesc->cur_segno,
		   (attr->compress ? "true" : "false"),
		   NameStr(compresstype),
		   attr->compressLevel);

	/*
	 * Temporarily set the firstRowNum for the block so that we can calculate
	 * the correct header length.
	 */
	AppendOnlyStorageWrite_SetFirstRowNum(&aoInsertDesc->storageWrite,
										  1);

	aoInsertDesc->completeHeaderLen =
		AppendOnlyStorageWrite_CompleteHeaderLen(
												 &aoInsertDesc->storageWrite,
												 AoHeaderKind_SmallContent);

	aoInsertDesc->maxDataLen =
		aoInsertDesc->usableBlockSize -
		aoInsertDesc->completeHeaderLen;

	aoInsertDesc->tempSpaceLen = aoInsertDesc->usableBlockSize / 8; /* TODO - come up with a
																	 * more efficient
																	 * calculation */
	aoInsertDesc->tempSpace = (uint8 *) palloc(aoInsertDesc->tempSpaceLen * sizeof(uint8));
	maxtupsize = aoInsertDesc->maxDataLen - VARBLOCK_HEADER_LEN - 4;
	aoInsertDesc->toast_tuple_threshold = maxtupsize / 4;	/* see tuptoaster.h for
															 * more information */
	aoInsertDesc->toast_tuple_target = maxtupsize / 4;

	/* open our current relation file segment for write */
	SetCurrentFileSegForWrite(aoInsertDesc);

	Assert(aoInsertDesc->tempSpaceLen > 0);

	/*
	 * Obtain the next list of fast sequences for this relation.
	 *
	 * Even in the case of no indexes, we need to update the fast sequences,
	 * since the table may contain indexes at some point of time.
	 */
	Assert(aoInsertDesc->fsInfo->segno == segno);

	GetAppendOnlyEntryAuxOids(aoInsertDesc->aoi_rel->rd_id, NULL, &segrelid,
			NULL, NULL, NULL, NULL);

	firstSequence =
		GetFastSequences(segrelid,
						 segno,
						 NUM_FAST_SEQUENCES);
	aoInsertDesc->numSequences = NUM_FAST_SEQUENCES;

	/* Set last_sequence value */
	Assert(firstSequence > aoInsertDesc->rowCount);
	aoInsertDesc->lastSequence = firstSequence - 1;

	setupNextWriteBlock(aoInsertDesc);

	/* Initialize the block directory. */
	AppendOnlyBlockDirectory_Init_forInsert(
											&(aoInsertDesc->blockDirectory),
aoInsertDesc->appendOnlyMetaDataSnapshot, //CONCERN:Safe to assume all block directory entries for segment are "covered" by same exclusive lock.
											aoInsertDesc->fsInfo, aoInsertDesc->lastSequence,
											rel, segno, 1, false);

	return aoInsertDesc;
}


/*
 *	appendonly_insert		- insert tuple into a varblock
 *
 * Note the following major differences from heap_insert
 *
 * - wal is always bypassed here.
 * - transaction information is of no interest.
 * - tuples inserted into varblocks, not via the postgresql buf/page manager.
 * - no need to pin buffers.
 *
  * The header fields of *tup are updated to match the stored tuple;
  *
  * Unlike heap_insert(), this function doesn't scribble on the input tuple.
  */
void
appendonly_insert(AppendOnlyInsertDesc aoInsertDesc,
				  MemTuple instup,
				  AOTupleId *aoTupleId)
{
	Relation	relation = aoInsertDesc->aoi_rel;
	VarBlockByteLen itemLen;
	uint8	   *itemPtr;
	MemTuple	tup = NULL;
	bool		need_toast;
	bool		isLargeContent;

	Assert(aoInsertDesc->usableBlockSize > 0 && aoInsertDesc->tempSpaceLen > 0);
	Assert(aoInsertDesc->toast_tuple_threshold > 0 && aoInsertDesc->toast_tuple_target > 0);

#ifdef FAULT_INJECTOR
	FaultInjector_InjectFaultIfSet(
								   "appendonly_insert",
								   DDLNotSpecified,
								   "", //databaseName
								   RelationGetRelationName(aoInsertDesc->aoi_rel));
	/* tableName */
#endif

	if (aoInsertDesc->useNoToast)
		need_toast = false;
	else
		need_toast = (MemTupleHasExternal(instup, aoInsertDesc->mt_bind) ||
					  memtuple_get_size(instup) > aoInsertDesc->toast_tuple_threshold);

	/*
	 * If the new tuple is too big for storage or contains already toasted
	 * out-of-line attributes from some other relation, invoke the toaster.
	 *
	 * Note: below this point, tup is the data we actually intend to store
	 * into the relation; instup is the caller's original untoasted data.
	 */
	if (need_toast)
		tup = toast_insert_or_update_memtup(relation, instup,
											NULL, aoInsertDesc->mt_bind,
											aoInsertDesc->toast_tuple_target,
											false,	/* errtbl is never AO */
											0);
	else
		tup = instup;

	/*
	 * get space to insert our next item (tuple)
	 */
	itemLen = memtuple_get_size(tup);
	isLargeContent = false;

	/*
	 * If we are at the limit for append-only storage header's row count,
	 * force this VarBlock to finish.
	 */
	if (VarBlockMakerItemCount(&aoInsertDesc->varBlockMaker) >= AOSmallContentHeader_MaxRowCount)
		itemPtr = NULL;
	else
		itemPtr = VarBlockMakerGetNextItemPtr(&aoInsertDesc->varBlockMaker, itemLen);

	/*
	 * If no more room to place items in the current varblock finish it and
	 * start inserting into the next one.
	 */
	if (itemPtr == NULL)
	{
		if (VarBlockMakerItemCount(&aoInsertDesc->varBlockMaker) == 0)
		{
			/*
			 * Case #1.  The entire tuple cannot fit within a VarBlock.  It is
			 * too large.
			 */
			if (aoInsertDesc->useNoToast)
			{
				/*
				 * Indicate we need to write the large tuple as a large
				 * content multiple-block set.
				 */
				isLargeContent = true;
			}
			else
			{
				/*
				 * Use a different errcontext when user input (tuple contents)
				 * cause the error.
				 */
				ereport(ERROR,
						(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
						 errmsg("item too long (check #1): length %d, maxBufferLen %d",
								itemLen, aoInsertDesc->varBlockMaker.maxBufferLen),
						 errcontext_appendonly_insert_block_user_limit(aoInsertDesc)));
			}
		}
		else
		{
			/*
			 * Write out the current VarBlock to make room.
			 */
			finishWriteBlock(aoInsertDesc);
			Assert(aoInsertDesc->nonCompressedData == NULL);
			Assert(!AppendOnlyStorageWrite_IsBufferAllocated(&aoInsertDesc->storageWrite));

			/*
			 * Setup a new VarBlock.
			 */
			setupNextWriteBlock(aoInsertDesc);

			itemPtr = VarBlockMakerGetNextItemPtr(&aoInsertDesc->varBlockMaker, itemLen);

			if (itemPtr == NULL)
			{
				/*
				 * Case #2.  The entire tuple cannot fit within a VarBlock. It
				 * is too large.
				 */
				if (aoInsertDesc->useNoToast)
				{
					/*
					 * Indicate we need to write the large tuple as a large
					 * content multiple-block set.
					 */
					isLargeContent = true;
				}
				else
				{
					/*
					 * Use a different errcontext when user input (tuple
					 * contents) cause the error.
					 */
					ereport(ERROR,
							(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
							 errmsg("item too long (check #2): length %d, maxBufferLen %d",
									itemLen, aoInsertDesc->varBlockMaker.maxBufferLen),
							 errcontext_appendonly_insert_block_user_limit(aoInsertDesc)));
				}
			}
		}
	}

	if (!isLargeContent)
	{
		/*
		 * We have room in the current VarBlock for the new tuple.
		 */
		Assert(itemPtr != NULL);

		if (itemLen > 0)
			memcpy(itemPtr, tup, itemLen);
	}
	else
	{
		/*
		 * Write the large tuple as a large content multiple-block set.
		 */
		Assert(itemPtr == NULL);
		Assert(!need_toast);
		Assert(instup == tup);

		/*
		 * "Cancel" the last block allocation, if one.
		 */
		cancelLastBuffer(aoInsertDesc);
		Assert(aoInsertDesc->nonCompressedData == NULL);
		Assert(!AppendOnlyStorageWrite_IsBufferAllocated(&aoInsertDesc->storageWrite));

		/*
		 * Write large content.
		 */
		AppendOnlyStorageWrite_Content(
									   &aoInsertDesc->storageWrite,
									   (uint8 *) tup,
									   itemLen,
									   AoExecutorBlockKind_SingleRow,
									    /* rowCount */ 1);
		Assert(aoInsertDesc->nonCompressedData == NULL);
		Assert(!AppendOnlyStorageWrite_IsBufferAllocated(&aoInsertDesc->storageWrite));

		setupNextWriteBlock(aoInsertDesc);
	}

	aoInsertDesc->insertCount++;
	aoInsertDesc->lastSequence++;
	if (aoInsertDesc->numSequences > 0)
		(aoInsertDesc->numSequences)--;

	Assert(aoInsertDesc->numSequences >= 0);

	AOTupleIdInit(aoTupleId, aoInsertDesc->cur_segno, aoInsertDesc->lastSequence);

	/*
	 * If the allocated fast sequence numbers are used up, we request for a
	 * next list of fast sequence numbers.
	 */
	if (aoInsertDesc->numSequences == 0)
	{
		int64		firstSequence;
		Oid segrelid;

		GetAppendOnlyEntryAuxOids(aoInsertDesc->aoi_rel->rd_id, NULL,
				&segrelid, NULL, NULL, NULL, NULL);

		firstSequence =
			GetFastSequences(segrelid,
							 aoInsertDesc->cur_segno,
							 NUM_FAST_SEQUENCES);

		Assert(firstSequence == aoInsertDesc->lastSequence + 1);
		aoInsertDesc->numSequences = NUM_FAST_SEQUENCES;
	}

	elogif(Debug_appendonly_print_insert_tuple, LOG,
		   "Append-only insert tuple for table '%s' "
		   "(AOTupleId %s, memtuple length %d, isLargeRow %s, block count " INT64_FORMAT ")",
		   NameStr(aoInsertDesc->aoi_rel->rd_rel->relname),
		   AOTupleIdToString(aoTupleId),
		   itemLen,
		   (isLargeContent ? "true" : "false"),
		   aoInsertDesc->bufferCount);

	if (tup != instup)
		pfree(tup);
}

/*
 * appendonly_insert_finish
 *
 * when done inserting all the data via appendonly_insert() we need to call
 * this function to flush all remaining data in the buffer into the file.
 */
void
appendonly_insert_finish(AppendOnlyInsertDesc aoInsertDesc)
{
	/*
	 * Finish up that last varblock.
	 */
	finishWriteBlock(aoInsertDesc);

	CloseWritableFileSeg(aoInsertDesc);

	AppendOnlyBlockDirectory_End_forInsert(&(aoInsertDesc->blockDirectory));

	AppendOnlyStorageWrite_FinishSession(&aoInsertDesc->storageWrite);

	UnregisterSnapshot(aoInsertDesc->appendOnlyMetaDataSnapshot);

	destroy_memtuple_binding(aoInsertDesc->mt_bind);

	pfree(aoInsertDesc->title);
	pfree(aoInsertDesc);
}

相关信息

greenplumn 源码目录

相关文章

greenplumn aomd 源码

greenplumn aomd_filehandler 源码

greenplumn aosegfiles 源码

greenplumn appendonly_blkdir_udf 源码

greenplumn appendonly_compaction 源码

greenplumn appendonly_visimap 源码

greenplumn appendonly_visimap_entry 源码

greenplumn appendonly_visimap_store 源码

greenplumn appendonly_visimap_udf 源码

greenplumn appendonlyam_handler 源码

0  赞