greenplumn cdbappendonlystoragewrite 源码

  • 2022-08-18
  • 浏览 (396)

greenplumn cdbappendonlystoragewrite 代码

文件路径:/src/backend/cdb/cdbappendonlystoragewrite.c

/*-------------------------------------------------------------------------
 *
 * cdbappendonlystoragewrite.c
 *
 * Portions Copyright (c) 2007-2009, Greenplum inc
 * Portions Copyright (c) 2012-Present VMware, Inc. or its affiliates.
 *
 *
 * IDENTIFICATION
 *	    src/backend/cdb/cdbappendonlystoragewrite.c
 *
 *-------------------------------------------------------------------------
 */
#include "postgres.h"

#ifndef WIN32
#include <sys/fcntl.h>
#else
#include <io.h>
#endif
#include <sys/file.h>
#include <unistd.h>

#include "catalog/catalog.h"
#include "catalog/heap.h"
#include "catalog/pg_compression.h"
#include "cdb/cdbappendonlystorageread.h"
#include "cdb/cdbappendonlystorage.h"
#include "cdb/cdbappendonlystoragelayer.h"
#include "cdb/cdbappendonlystorageformat.h"
#include "cdb/cdbappendonlystoragewrite.h"
#include "cdb/cdbappendonlyxlog.h"
#include "common/relpath.h"
#include "pgstat.h"
#include "storage/gp_compress.h"
#include "utils/faultinjector.h"
#include "utils/guc.h"


/*----------------------------------------------------------------
 * Initialization
 *----------------------------------------------------------------
 */

/*
 * Initialize AppendOnlyStorageWrite.
 *
 * The AppendOnlyStorageWrite data structure is initialized once for an
 * append "session" and can be used to add Append-Only Storage Blocks to 1
 * or more segment files.
 *
 * The current file to write to is opened with the
 * AppendOnlyStorageWrite_OpenFile routine.
 *
 * storageWrite		- data structure to initialize
 * memoryContext	- memory context to use for buffers and other memory
 *					  needs. When NULL, the current memory context is used.
 * maxBufferLen		- maximum Append-Only Storage Block length including all
 *					  storage headers.
 * relationName		- name of the relation to use in log and error messages.
 * title			- A phrase that better describes the purpose of this open.
 *					  The caller manages the storage for this.
 * storageAttributes - Append-Only Storage Attributes from relation creation.
 */
void
AppendOnlyStorageWrite_Init(AppendOnlyStorageWrite *storageWrite,
							MemoryContext memoryContext,
							int32 maxBufferLen,
							char *relationName,
							char *title,
							AppendOnlyStorageAttributes *storageAttributes,
							bool needsWAL)
{
	uint8	   *memory;
	int32		memoryLen;
	MemoryContext oldMemoryContext;

	Assert(storageWrite != NULL);

	/* UNDONE: Range check maxBufferLen */

	Assert(relationName != NULL);
	Assert(storageAttributes != NULL);

	/* UNDONE: Range check fields in storageAttributes */

	MemSet(storageWrite, 0, sizeof(AppendOnlyStorageWrite));

	storageWrite->maxBufferLen = maxBufferLen;

	if (memoryContext == NULL)
		storageWrite->memoryContext = CurrentMemoryContext;
	else
		storageWrite->memoryContext = memoryContext;

	oldMemoryContext = MemoryContextSwitchTo(storageWrite->memoryContext);

	memcpy(
		   &storageWrite->storageAttributes,
		   storageAttributes,
		   sizeof(AppendOnlyStorageAttributes));

	/*
	 * Determine the fixed header length based on the checksum flag.
	 */
	storageWrite->regularHeaderLen = AoHeader_RegularSize;
	if (storageWrite->storageAttributes.checksum)
		storageWrite->regularHeaderLen += 2 * sizeof(pg_crc32);

	storageWrite->relationName = pstrdup(relationName);
	storageWrite->title = title;

	/*
	 * Set up extra buffers for compression.
	 */
	storageWrite->compressionOverrunLen = storageAttributes->overflowSize;

	elog(DEBUG2, "Requested compression overflow bytes = %d.", storageAttributes->overflowSize);

	if (storageWrite->storageAttributes.compress)
	{
		storageWrite->uncompressedBuffer = (uint8 *) palloc0(storageWrite->maxBufferLen * sizeof(uint8));
	}
	else
	{
		Assert(storageWrite->uncompressedBuffer == NULL);
	}

	/*
	 * Now that we have determined the compression overrun, we can initialize
	 * BufferedAppend with the correct maxBufferLen + compressionOverrunLen.
	 */
	storageWrite->maxBufferWithCompressionOverrrunLen =
		storageWrite->maxBufferLen + storageWrite->compressionOverrunLen;
	storageWrite->maxLargeWriteLen = 2 * storageWrite->maxBufferLen;
	Assert(storageWrite->maxBufferWithCompressionOverrrunLen <= storageWrite->maxLargeWriteLen);

	memoryLen = BufferedAppendMemoryLen
		(
		 storageWrite->maxBufferWithCompressionOverrrunLen, /* maxBufferLen */
		 storageWrite->maxLargeWriteLen);

	memory = (uint8 *) palloc(memoryLen);

	BufferedAppendInit(&storageWrite->bufferedAppend,
					   memory,
					   memoryLen,
					   storageWrite->maxBufferWithCompressionOverrrunLen,
					   storageWrite->maxLargeWriteLen,
					   relationName);

	elogif(Debug_appendonly_print_insert || Debug_appendonly_print_append_block, LOG,
		   "Append-Only Storage Write initialize for table '%s' (compression = %s, compression level %d, maximum buffer length %d, large write length %d)",
		   storageWrite->relationName,
		   (storageWrite->storageAttributes.compress ? "true" : "false"),
		   storageWrite->storageAttributes.compressLevel,
		   storageWrite->maxBufferWithCompressionOverrrunLen,
		   storageWrite->maxLargeWriteLen);

	/*
	 * When doing VerifyBlock, allocate the extra buffers.
	 */
	if (gp_appendonly_verify_write_block && storageWrite->storageAttributes.compress)
	{
		storageWrite->verifyWriteBuffer = (uint8 *) palloc(storageWrite->maxBufferLen * sizeof(uint8));
	}
	else
	{
		Assert(storageWrite->verifyWriteBuffer == NULL);
	}

	storageWrite->file = -1;
	storageWrite->formatVersion = -1;
	storageWrite->needsWAL = needsWAL;

	MemoryContextSwitchTo(oldMemoryContext);

	storageWrite->isActive = true;
}

/*
 * Finish using the AppendOnlyStorageWrite session created with ~Init.
 */
void
AppendOnlyStorageWrite_FinishSession(AppendOnlyStorageWrite *storageWrite)
{
	MemoryContext oldMemoryContext;

	if (!storageWrite->isActive)
		return;

	oldMemoryContext = MemoryContextSwitchTo(storageWrite->memoryContext);

	/*
	 * UNDONE: This expects the MemoryContent to be what was used for the
	 * 'memory' in ~Init
	 */
	BufferedAppendFinish(&storageWrite->bufferedAppend);

	if (storageWrite->relationName != NULL)
	{
		pfree(storageWrite->relationName);
		storageWrite->relationName = NULL;
	}

	if (storageWrite->uncompressedBuffer != NULL)
	{
		pfree(storageWrite->uncompressedBuffer);
		storageWrite->uncompressedBuffer = NULL;
	}

	if (storageWrite->verifyWriteBuffer != NULL)
	{
		pfree(storageWrite->verifyWriteBuffer);
		storageWrite->verifyWriteBuffer = NULL;
	}

	if (storageWrite->segmentFileName != NULL)
	{
		pfree(storageWrite->segmentFileName);
		storageWrite->segmentFileName = NULL;
	}

	if (storageWrite->compression_functions != NULL)
	{
		callCompressionDestructor(storageWrite->compression_functions[COMPRESSION_DESTRUCTOR], storageWrite->compressionState);
		if (storageWrite->compressionState != NULL)
		{
			pfree(storageWrite->compressionState);
		}

		if (storageWrite->verifyWriteCompressionState != NULL)
		{
			callCompressionDestructor(storageWrite->compression_functions[COMPRESSION_DESTRUCTOR], storageWrite->verifyWriteCompressionState);
		}
	}

	/* Deallocation is done.      Go back to caller memory-context. */
	MemoryContextSwitchTo(oldMemoryContext);

	storageWrite->isActive = false;

}

/*----------------------------------------------------------------
 * Open and FlushAndClose
 *----------------------------------------------------------------
 */

/*
 * Creates an on-demand Append-Only segment file under transaction.
 */
void
AppendOnlyStorageWrite_TransactionCreateFile(AppendOnlyStorageWrite *storageWrite,
											 RelFileNodeBackend *relFileNode,
											 int32 segmentFileNum)
{
	Assert(segmentFileNum > 0);

	/* The file might already exist. that's OK */
	// WALREP_FIXME: Pass isRedo == true, so that you don't get an error if it
	// exists already. That's currently OK, but in the future, other things
	// might depend on the isRedo flag, like whether to WAL-log the creation.
	smgrcreate_ao(*relFileNode, segmentFileNum, true);

	/*
	 * Create a WAL record, so that the segfile is also created after crash or
	 * in possible standby server. Not strictly necessarily, because a 0-length
	 * segfile and a non-existent segfile are treated the same. But the
	 * gp_replica_check tool, to compare primary and mirror, will complain if
	 * a file exists in master but not in mirror, even if it's empty.
	 */
	if (storageWrite->needsWAL)
		xlog_ao_insert(relFileNode->node, segmentFileNum, 0, NULL, 0);
}

/*
 * Opens the next segment file to write.  The file must already exist.
 *
 * This routine is responsible for seeking to the proper write location given
 * the logical EOF.
 *
 * filePathName		- name of the segment file to open.
 * version			- AO table format version the file is in.
 * logicalEof		- last committed write transaction's EOF value to use as
 *					  the end of the segment file.
 */
void
AppendOnlyStorageWrite_OpenFile(AppendOnlyStorageWrite *storageWrite,
								char *filePathName,
								int version,
								int64 logicalEof,
								int64 fileLen_uncompressed,
								RelFileNodeBackend *relFileNode,
								int32 segmentFileNum)
{
	File		file;
	MemoryContext oldMemoryContext;

	Assert(storageWrite != NULL);
	Assert(storageWrite->isActive);

	Assert(filePathName != NULL);

	/*
	 * Assume that we only write in the current latest format. (it's redundant
	 * to pass the version number as argument, currently)
	 */
	if (version != AORelationVersion_GetLatest())
		ereport(ERROR,
				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
				 errmsg("cannot write append-only table version %d", version)));

	/*
	 * Open or create the file for write.
	 */
	char	   *path = aorelpath(*relFileNode, segmentFileNum);

	errno = 0;

	int			fileFlags = O_RDWR | PG_BINARY;
	file = PathNameOpenFile(path, fileFlags);
	if (file < 0)
		ereport(ERROR,
				(errcode_for_file_access(),
				 errmsg("Append-only Storage Write could not open segment file %s \"%s\" for relation \"%s\": %m",
						path, filePathName,
						storageWrite->relationName)));

	/*
	 * Seek to the logical EOF write position.
	 */
	storageWrite->file = file;
	storageWrite->formatVersion = version;
	storageWrite->startEof = logicalEof;
	storageWrite->relFileNode = *relFileNode;
	storageWrite->segmentFileNum = segmentFileNum;

	/*
	 * When writing multiple segment files, we throw away the old segment file
	 * name strings.
	 */
	oldMemoryContext = MemoryContextSwitchTo(storageWrite->memoryContext);

	if (storageWrite->segmentFileName != NULL)
		pfree(storageWrite->segmentFileName);

	storageWrite->segmentFileName = pstrdup(filePathName);

	/* Allocation is done.  Go back to caller memory-context. */
	MemoryContextSwitchTo(oldMemoryContext);

	/*
	 * Tell the BufferedAppend module about the file we just opened.
	 */
	BufferedAppendSetFile(&storageWrite->bufferedAppend,
						  storageWrite->file,
						  storageWrite->relFileNode,
						  storageWrite->segmentFileNum,
						  storageWrite->segmentFileName,
						  logicalEof,
						  fileLen_uncompressed);
}

/*
 * Optionally pad out to next page boundary.
 *
 * Since we do not do typical recovery processing of append-only file-system
 * pages, we pad out the last file-system byte with zeroes. The number of
 * bytes that are padded with zero's is determined by safefswritesize.
 * This function pads with 0's of length padLen or pads the whole remainder
 * of the safefswritesize size with 0's if padLen is -1.
 */
static void
AppendOnlyStorageWrite_DoPadOutRemainder(AppendOnlyStorageWrite *storageWrite,
										 int32 padLen)
{
	int64		nextWritePosition;
	int64		nextBoundaryPosition;
	int32		safeWrite = storageWrite->storageAttributes.safeFSWriteSize;
	int32		safeWriteRemainder;
	bool		doPad;
	uint8	   *buffer;

	/* early exit if no pad needed */
	if (safeWrite == 0)
		return;

	nextWritePosition = BufferedAppendNextBufferPosition(&storageWrite->bufferedAppend);
	nextBoundaryPosition =
		((nextWritePosition + safeWrite - 1) / safeWrite) * safeWrite;
	safeWriteRemainder = (int32) (nextBoundaryPosition - nextWritePosition);

	if (safeWriteRemainder <= 0)
		doPad = false;
	else if (padLen == -1)
	{
		/*
		 * Pad to end of page.
		 */
		doPad = true;
	}
	else
		doPad = (safeWriteRemainder < padLen);

	if (doPad)
	{
		/*
		 * Get buffer of the remainder to pad.
		 */
		buffer = BufferedAppendGetBuffer(&storageWrite->bufferedAppend,
										 safeWriteRemainder);

		if (buffer == NULL)
		{
			ereport(ERROR,
					(errcode(ERRCODE_INTERNAL_ERROR),
					 errmsg("We do not expect files to be have a maximum length")));
		}

		memset(buffer, 0, safeWriteRemainder);
		BufferedAppendFinishBuffer(&storageWrite->bufferedAppend,
								   safeWriteRemainder,
								   safeWriteRemainder,
								   storageWrite->needsWAL);

		elogif(Debug_appendonly_print_insert, LOG,
			   "Append-only insert zero padded safeWriteRemainder for table '%s' (nextWritePosition = " INT64_FORMAT ", safeWriteRemainder = %d)",
			   storageWrite->relationName,
			   nextWritePosition,
			   safeWriteRemainder);
	}
}

/*
 * Flush and close the current segment file.
 *
 * No error if the current is already closed.
 *
 * The new EOF of the segment file is returend in *newLogicalEof.
 */
void
AppendOnlyStorageWrite_FlushAndCloseFile(
										 AppendOnlyStorageWrite *storageWrite,
										 int64 *newLogicalEof,
										 int64 *fileLen_uncompressed)
{
	Assert(storageWrite != NULL);
	Assert(storageWrite->isActive);

	if (storageWrite->file == -1)
	{
		*newLogicalEof = 0;
		*fileLen_uncompressed = 0;
		return;
	}

	/*
	 * We pad out append commands to the page boundary.
	 */
	AppendOnlyStorageWrite_DoPadOutRemainder(
											 storageWrite,
											  /* indicate till end of page */ -1);

	/*
	 * Have the BufferedAppend module let go, but this does not close the
	 * file.
	 */
	BufferedAppendCompleteFile(&storageWrite->bufferedAppend,
							   newLogicalEof,
							   fileLen_uncompressed,
							   storageWrite->needsWAL);

	/*
	 * We must take care of fsynching to disk ourselves since a fsync request
	 * is not enqueued for an AO segment file that is written to disk on
	 * primary.  Temp tables are not crash safe, no need to fsync them.
	 */
	if (!RelFileNodeBackendIsTemp(storageWrite->relFileNode) &&
		FileSync(storageWrite->file, WAIT_EVENT_DATA_FILE_SYNC) != 0)
		ereport(ERROR,
				(errcode_for_file_access(),
				 errmsg("Could not flush (fsync) Append-Only segment file '%s' to disk for relation '%s': %m",
						storageWrite->segmentFileName,
						storageWrite->relationName)));

	FileClose(storageWrite->file);

	storageWrite->file = -1;
	storageWrite->formatVersion = -1;

	MemSet(&storageWrite->relFileNode, 0, sizeof(RelFileNode));
	storageWrite->segmentFileNum = 0;
}

/*
 * Flush and close the current segment file under a transaction.
 *
 * Handles mirror loss end transaction work.
 *
 * No error if the current is already closed.
 *
 * The new EOF of the segment file is returned in *newLogicalEof.
 */
void
AppendOnlyStorageWrite_TransactionFlushAndCloseFile(AppendOnlyStorageWrite *storageWrite,
													int64 *newLogicalEof,
													int64 *fileLen_uncompressed)
{
	Assert(storageWrite != NULL);
	Assert(storageWrite->isActive);

	if (storageWrite->file == -1)
	{
		*newLogicalEof = 0;
		*fileLen_uncompressed = 0;
		return;
	}

	AppendOnlyStorageWrite_FlushAndCloseFile(storageWrite,
											 newLogicalEof,
											 fileLen_uncompressed);
}


/*----------------------------------------------------------------
 * Usable Block Length
 *----------------------------------------------------------------
 */

/*
 * When writing "short" content intended to stay within the maxBufferLen (also
 * known as blocksize), some of the buffer will be used for the Append-Only
 * Block Header.  This function returns that overhead length.
 *
 * Isn't the length of the Append-Only Storage Block constant? NO.
 *
 * Currently, there are two things that can make it longer.  When checksums
 * are configured, we add checksum data to the header.  And there is optional
 * header data (e.g. firstRowNum).
 *
 * We call the header portion with the optional checksums the fixed header
 * because we need to be able to read and evaluate the checksums before we can
 * interpret flags in the fixed header that indicate there is more header
 * information.
 *
 * The complete header length is the fixed header plus optional information.
 */

/*
 * Returns the Append-Only Storage Block complete header length in bytes.
 *
 * Call this routine after specifying all optional header information for the current block
 * begin written.
 */
int32
AppendOnlyStorageWrite_CompleteHeaderLen(AppendOnlyStorageWrite *storageWrite,
										 AoHeaderKind aoHeaderKind)
{
	int32		completeHeaderLen;

	Assert(storageWrite != NULL);
	Assert(storageWrite->isActive);

	completeHeaderLen = storageWrite->regularHeaderLen;
	if (AoHeader_IsLong(aoHeaderKind))
	{
		completeHeaderLen += (AoHeader_LongSize - AoHeader_RegularSize);
	}

	if (storageWrite->isFirstRowNumSet)
		completeHeaderLen += sizeof(int64);

	return completeHeaderLen;
}

/*
 * Returns the Append-Only Storage large content metadata header length in
 * bytes.
 *
 * Call this routine after specifying all optional header information for the
 * current block begin written.
 */
static int32
AppendOnlyStorageWrite_LargeContentHeaderLen(AppendOnlyStorageWrite *storageWrite)
{
	int32		completeHeaderLen;

	Assert(storageWrite != NULL);
	Assert(storageWrite->isActive);

	completeHeaderLen = storageWrite->regularHeaderLen;

	if (storageWrite->isFirstRowNumSet)
		completeHeaderLen += sizeof(int64);

	/* UNDONE: Right alignment? */

	return completeHeaderLen;
}

char *
AppendOnlyStorageWrite_ContextStr(AppendOnlyStorageWrite *storageWrite)
{
	int64		headerOffsetInFile =
		BufferedAppendCurrentBufferPosition(&storageWrite->bufferedAppend);

	return psprintf("%s. Append-Only segment file '%s', header offset in file " INT64_FORMAT,
					storageWrite->title,
					storageWrite->segmentFileName,
					headerOffsetInFile);
}

static char *
AppendOnlyStorageWrite_BlockHeaderStr(AppendOnlyStorageWrite *storageWrite,
									  uint8 *header)
{
	return AppendOnlyStorageFormat_BlockHeaderStr(header,
												  storageWrite->storageAttributes.checksum,
												  storageWrite->formatVersion);
}


static void
AppendOnlyStorageWrite_LogBlockHeader(AppendOnlyStorageWrite *storageWrite,
									  int64 headerOffsetInFile,
									  uint8 *header)
{
	char	   *blockHeaderStr;

	blockHeaderStr = AppendOnlyStorageWrite_BlockHeaderStr(storageWrite,
														   header);
	ereport(LOG,
			(errmsg("%s. Append-Only segment file '%s', header offset in file " INT64_FORMAT ". %s",
					storageWrite->title,
					storageWrite->segmentFileName,
					headerOffsetInFile,
					blockHeaderStr)));

	pfree(blockHeaderStr);
}

/*----------------------------------------------------------------
 * errcontext and errdetail
 *----------------------------------------------------------------
 */

/*
 * errcontext_appendonly_write_storage_block
 *
 * Add an errcontext() line showing the table, segment file, offset in file,
 * block count of the storage block being read.
 */
static int
errcontext_appendonly_write_storage_block(AppendOnlyStorageWrite *storageWrite)
{
	int64		headerOffsetInFile;

	headerOffsetInFile = BufferedAppendCurrentBufferPosition(&storageWrite->bufferedAppend);

	errcontext("Append-Only table '%s', segment file '%s', block header offset in file = " INT64_FORMAT ", bufferCount " INT64_FORMAT ")",
			   storageWrite->relationName,
			   storageWrite->segmentFileName,
			   headerOffsetInFile,
			   storageWrite->bufferCount);

	return 0;
}

/*
 * errdetail_appendonly_write_storage_block_header
 *
 * Add an errdetail() line showing the Append-Only Storage header being written.
 */
static void
errdetail_appendonly_write_storage_block_header(AppendOnlyStorageWrite *storageWrite)
{
	uint8	   *header;
	bool		checksum;
	int			version;

	header = BufferedAppendGetCurrentBuffer(&storageWrite->bufferedAppend);
	checksum = storageWrite->storageAttributes.checksum;
	version = storageWrite->formatVersion;

	errdetail_appendonly_storage_smallcontent_header(header, checksum,
													 version);
}

/*----------------------------------------------------------------
 * Writing Small Content Efficiently that is not being Bulk Compressed
 *----------------------------------------------------------------
 */

/*
 * This section describes for writing content that is less than or equal to
 * the blocksize (e.g. 32k) bytes that is not being bulk compressed by the
 * Append-Only Storage Layer.
 *
 * Actually, the content is limited to blocksize minus the Append-Only header
 * size (see the AppendOnlyStorageWrite_HeaderLen routine).
 */

/*
 * Get a pointer to next maximum length buffer space for
 * appending small content.
 *
 * You must decide whether you are supplying the optional first row number
 * BEFORE calling this routine!
 *
 * NOTE: The maximum length buffer space =
 *				maxBufferLen +
 *				AppendOnlyStorageWrite_CompleteHeaderLen(...)
 *
 * When compression is not being used, this interface provides a pointer
 * directly into the write buffer for efficient data generation.  Otherwise,
 * a pointer to a temporary buffer to collect the uncompressed contents will
 * be provided.
 *
 * Returns NULL when the current file does not have enough room for another
 * buffer.
 */
uint8 *
AppendOnlyStorageWrite_GetBuffer(AppendOnlyStorageWrite *storageWrite,
								 int aoHeaderKind)
{
	Assert(storageWrite != NULL);
	Assert(storageWrite->isActive);

	Assert(aoHeaderKind == AoHeaderKind_SmallContent ||
		   aoHeaderKind == AoHeaderKind_NonBulkDenseContent ||
		   aoHeaderKind == AoHeaderKind_BulkDenseContent);

	storageWrite->getBufferAoHeaderKind = aoHeaderKind;

	/*
	 * Both headers (Small and NonBulkDense) have the same length. BulkDense
	 * is a long header.
	 */
	storageWrite->currentCompleteHeaderLen =
		AppendOnlyStorageWrite_CompleteHeaderLen(
												 storageWrite,
												 aoHeaderKind);


	/*
	 * If compression configured, the supply the temporary buffer instead.
	 */
	if (storageWrite->storageAttributes.compress)
	{
		storageWrite->currentBuffer = NULL;

		return storageWrite->uncompressedBuffer;
	}
	else
	{
		storageWrite->currentBuffer =
			BufferedAppendGetMaxBuffer(&storageWrite->bufferedAppend);

		return &storageWrite->currentBuffer[storageWrite->currentCompleteHeaderLen];
	}
}

/*
 * Test if a buffer is currently allocated.
 */
bool
AppendOnlyStorageWrite_IsBufferAllocated(AppendOnlyStorageWrite *storageWrite)
{
	Assert(storageWrite != NULL);
	Assert(storageWrite->isActive);

	return (storageWrite->currentCompleteHeaderLen > 0);
}

/*
 * Return the beginning of the last write position of the write buffer.
 */
int64
AppendOnlyStorageWrite_LogicalBlockStartOffset(AppendOnlyStorageWrite *storageWrite)
{
	return storageWrite->logicalBlockStartOffset;
}


static void
AppendOnlyStorageWrite_VerifyWriteBlock(AppendOnlyStorageWrite *storageWrite,
										int64 headerOffsetInFile,
										int32 bufferLen,
										uint8 *expectedContent,
										int32 expectedUncompressedLen,
										int expectedExecutorBlockKind,
										int expectedRowCount,
										int32 expectedCompressedLen)
{
	uint8	   *header;

	AOHeaderCheckError checkError;
	AoHeaderKind headerKind;
	int32		actualHeaderLen;
	int32		offset;
	int32		uncompressedLen = 0;
	bool		isCompressed = false;
	int32		overallBlockLen;
	int32		compressedLen = 0;
	int			executorBlockKind = 0;
	bool		hasFirstRowNum;
	int64		firstRowNum;
	int			rowCount = 0;
	pg_crc32	storedChecksum;
	pg_crc32	computedChecksum;

	if (storageWrite->storageAttributes.compress && storageWrite->verifyWriteBuffer == NULL)
		return;					/* GUC must have been turned on
								 * mid-transaction. */

	if (gp_appendonly_verify_write_block == false)
		elog(WARNING, "The GUC gp_appendonly_verify_write_block is false. Compressed write not checked.");

	header = BufferedAppendGetCurrentBuffer(&storageWrite->bufferedAppend);

	/*
	 * Code is similar to that in getNextStorageBlockInFile routine.
	 */

	/*----------
	 * Proceed very carefully:
	 * [ 1. Verify header checksum ]
	 *	 2. Examine (basic) header.
	 *	 3. Examine specific header.
	 * [ 4. Verify the block checksum ]
	 *----------
	 */
	if (storageWrite->storageAttributes.checksum)
	{
		if (!AppendOnlyStorageFormat_VerifyHeaderChecksum(
														  header,
														  &storedChecksum,
														  &computedChecksum))
			ereport(ERROR,
					(errmsg("Verify block during write found header checksum does not match.  Expected 0x%08X and found 0x%08X",
							storedChecksum,
							computedChecksum),
					 errdetail_appendonly_write_storage_block_header(storageWrite),
					 errcontext_appendonly_write_storage_block(storageWrite)));
	}

	/*
	 * Check the (basic) header information.
	 */
	checkError = AppendOnlyStorageFormat_GetHeaderInfo(
													   header,
													   storageWrite->storageAttributes.checksum,
													   &headerKind,
													   &actualHeaderLen);
	if (checkError != AOHeaderCheckOk)
		ereport(ERROR,
				(errmsg("Verify block during write found bad append-only storage header. Header check error %d, detail '%s'",
						(int) checkError,
						AppendOnlyStorageFormat_GetHeaderCheckErrorStr()),
				 errdetail_appendonly_write_storage_block_header(storageWrite),
				 errcontext_appendonly_write_storage_block(storageWrite)));

	switch (headerKind)
	{
		case AoHeaderKind_SmallContent:

			/*
			 * Check the Block header information.
			 */
			checkError = AppendOnlyStorageFormat_GetSmallContentHeaderInfo(header,
																		   actualHeaderLen,
																		   storageWrite->storageAttributes.checksum,
																		   bufferLen,
																		   &overallBlockLen,
																		   &offset, //Offset to data.
																		   & uncompressedLen,
																		   &executorBlockKind,
																		   &hasFirstRowNum,
																		   storageWrite->formatVersion,
																		   &firstRowNum,
																		   &rowCount,
																		   &isCompressed,
																		   &compressedLen);
			break;

		case AoHeaderKind_LargeContent:

			/*
			 * Check the LargeContent header information.
			 */
			checkError = AppendOnlyStorageFormat_GetLargeContentHeaderInfo(header,
																		   actualHeaderLen,
																		   storageWrite->storageAttributes.checksum,
																		   &uncompressedLen,
																		   &executorBlockKind,
																		   &hasFirstRowNum,
																		   &firstRowNum,
																		   &rowCount);
			break;
		case AoHeaderKind_NonBulkDenseContent:
			checkError = AppendOnlyStorageFormat_GetNonBulkDenseContentHeaderInfo(header,
																				  actualHeaderLen,
																				  storageWrite->storageAttributes.checksum,
																				  bufferLen,
																				  &overallBlockLen,
																				  &offset, //Offset to data.
																				  &uncompressedLen,
																				  &executorBlockKind,
																				  &hasFirstRowNum,
																				  storageWrite->formatVersion,
																				  &firstRowNum,
																				  &rowCount);
			break;
		case AoHeaderKind_BulkDenseContent:
			checkError = AppendOnlyStorageFormat_GetBulkDenseContentHeaderInfo(header,
																			   actualHeaderLen,
																			   storageWrite->storageAttributes.checksum,
																			   bufferLen,
																			   &overallBlockLen,
																			   &offset, //Offset to data.
																			   &uncompressedLen,
																			   &executorBlockKind,
																			   &hasFirstRowNum,
																			   storageWrite->formatVersion,
																			   &firstRowNum,
																			   &rowCount,
																			   &isCompressed,
																			   &compressedLen);
			break;
		default:
			elog(ERROR, "Unexpected Append-Only header kind %d",
				 headerKind);
			break;
	}
	if (checkError != AOHeaderCheckOk)
		ereport(ERROR,
				(errmsg("Verify block during write found bad append-only storage block header. Block type: %d "
						"Header check error %d, detail '%s'",
						headerKind,
						(int) checkError,
				   AppendOnlyStorageFormat_GetHeaderCheckErrorStr()),
		errdetail_appendonly_write_storage_block_header(storageWrite),
		   errcontext_appendonly_write_storage_block(storageWrite)));

	if (uncompressedLen != expectedUncompressedLen)
		ereport(ERROR,
				(errmsg("Verify block during write found append-only storage block header. "
					"DataLen %d does not equal expected length %d, ",
						uncompressedLen,
						expectedUncompressedLen),
		errdetail_appendonly_write_storage_block_header(storageWrite),
		   errcontext_appendonly_write_storage_block(storageWrite)));


	if (compressedLen != expectedCompressedLen)
		ereport(ERROR,
				(errmsg("Verify block during write found append-only storage block header. "
				"CompressedLen %d does not equal expected length %d",
						compressedLen,
						expectedCompressedLen),
		errdetail_appendonly_write_storage_block_header(storageWrite),
		   errcontext_appendonly_write_storage_block(storageWrite)));

	/*
	 * Now verify the executor portion of the block.
	 */

	if (executorBlockKind != expectedExecutorBlockKind)
		ereport(ERROR,
				(errmsg("Verify block during write found append-only storage block header. "
			"ExecutorBlockKind %d does not equal expected value %d.",
						executorBlockKind,
						expectedExecutorBlockKind),
		errdetail_appendonly_write_storage_block_header(storageWrite),
		   errcontext_appendonly_write_storage_block(storageWrite)));

	/* UNDONE: Check hasFirstRowNum */

	if (rowCount != expectedRowCount)
		ereport(ERROR,
				(errmsg("Verify block during write found append-only storage block header. "
					  "RowCount %d does not equal expected value %d",
						rowCount,
						expectedRowCount),
		errdetail_appendonly_write_storage_block_header(storageWrite),
		   errcontext_appendonly_write_storage_block(storageWrite)));

	if (Debug_appendonly_print_verify_write_block)
	{
		AppendOnlyStorageWrite_LogBlockHeader(storageWrite,
											  headerOffsetInFile,
											  header);
	}


	if (isCompressed)
	{
		int			test;
		PGFunction	decompressor;
		PGFunction *cfns = storageWrite->compression_functions;

		Assert(gp_appendonly_verify_write_block == true);
		Assert(storageWrite->verifyWriteCompressionState != NULL);

		if (cfns == NULL)
			ereport(ERROR,
					(errcode(ERRCODE_INTERNAL_ERROR),
					 errmsg("decompression information missing")));

		decompressor = cfns[COMPRESSION_DECOMPRESS];

		gp_decompress(&header[offset], //Compressed data in block.
					  compressedLen,
					  storageWrite->verifyWriteBuffer, //Temporary buffer to hold uncompressed data.
					  uncompressedLen,
					  decompressor,
					  storageWrite->verifyWriteCompressionState,
					  storageWrite->bufferCount);

		/*
		 * Compare.
		 */
		test = memcmp(expectedContent,
					  storageWrite->verifyWriteBuffer,
					  uncompressedLen);

		if (test != 0)
			ereport(ERROR,
					(errmsg("Verify block during write found decompress did not produce the exact same bits passed to compress! "
							"Memcmp result %d",
							test),
					 errdetail_appendonly_write_storage_block_header(storageWrite),
			errcontext_appendonly_write_storage_block(storageWrite)));

	}
	else
	{
		/* UNDONE: Do comparison here */
	}

#ifdef NeedCallBack
	if (executorBlockKind == AoExecutorBlockKind_VarBlock)
	{
		VarBlockCheckError varBlockCheckError;
		VarBlockReader varBlockReader;
		int			readerItemCount;

		varBlockCheckError = VarBlockIsValid(data, uncompressedLen);
		if (varBlockCheckError != VarBlockCheckOk)
			ereport(ERROR,
					(errmsg("Verify block during write found VarBlock is not valid "
							"Valid block check error %d, detail '%s'",
							varBlockCheckError,
							VarBlockGetCheckErrorStr()),
					 errdetail_appendonly_write_storage_block_header(storageWrite),
					 errcontext_appendonly_write_storage_block(storageWrite)));

		/*
		 * Now use the VarBlock module to extract the items out.
		 */
		VarBlockReaderInit(&varBlockReader,
						   data,
						   uncompressedLen);

		readerItemCount = VarBlockReaderItemCount(&varBlockReader);

		if (rowCount != readerItemCount)
		{
			ereport(ERROR,
					(errmsg("Verify block during write found row count %d in append-only storage header does not match VarBlock item count %d",
							rowCount,
							readerItemCount),
					 errdetail_appendonly_write_storage_block_header(storageWrite),
					 errcontext_appendonly_write_storage_block(storageWrite)));
		}
	}
#endif
}

static void
AppendOnlyStorageWrite_CompressAppend(AppendOnlyStorageWrite *storageWrite,
									  uint8 *sourceData,
									  int32 sourceLen,
									  int executorBlockKind,
									  int itemCount,
									  int32 *compressedLen,
									  int32 *bufferLen)
{
	uint8	   *header;
	uint8	   *dataBuffer;
	int32		dataBufferWithOverrrunLen;
	PGFunction *cfns = storageWrite->compression_functions;
	PGFunction	compressor;

	if (cfns == NULL)
		compressor = NULL;
	else
		compressor = cfns[COMPRESSION_COMPRESS];

	/* UNDONE: This can be a duplicate call... */
	storageWrite->currentCompleteHeaderLen =
		AppendOnlyStorageWrite_CompleteHeaderLen(
			storageWrite,
			storageWrite->getBufferAoHeaderKind);

	header = BufferedAppendGetMaxBuffer(&storageWrite->bufferedAppend);
	if (header == NULL)
		ereport(ERROR,
				(errcode(ERRCODE_INTERNAL_ERROR),
				 errmsg("We do not expect files to be have a maximum length"),
				 errcontext_appendonly_write_storage_block(storageWrite)));

	dataBuffer = &header[storageWrite->currentCompleteHeaderLen];
	dataBufferWithOverrrunLen =
		storageWrite->maxBufferWithCompressionOverrrunLen
		- storageWrite->currentCompleteHeaderLen;

	/*
	 * Compress into the BufferedAppend buffer after the large header (and
	 * optional checksum, etc.
	 */
	gp_trycompress(sourceData,
					sourceLen,
					dataBuffer,
					dataBufferWithOverrrunLen,
					compressedLen,
					compressor,
					storageWrite->compressionState);

#ifdef FAULT_INJECTOR
	/* Simulate that compression is not possible if the fault is set. */
	if (FaultInjector_InjectFaultIfSet(
			"appendonly_skip_compression",
			DDLNotSpecified,
			"",
			storageWrite->relationName) == FaultInjectorTypeSkip)
	{
		*compressedLen = sourceLen + 1;
	}
#endif

	/*
	 * We always store the data compressed if the compressed length is less
	 * than the uncompressed length.
	 *
	 * TODO: this is a weak assumption. It doesn't account for the fact that
	 * it's not worth paying the CPU cost of decompression for a potentially
	 * trivial saving.
	 *
	 * The best solution to this seems to be to make the threshold at which we
	 * compress data user configurable.
	 */
	int32 dataLen = *compressedLen;
	if (*compressedLen >= sourceLen)
	{
		dataLen = sourceLen;
		memcpy(dataBuffer, sourceData, sourceLen);
		*compressedLen = 0;
	}
	int32 dataRoundedUpLen =
		AOStorage_RoundUp(dataLen, storageWrite->formatVersion);
	AOStorage_ZeroPad(dataBuffer, dataLen, dataRoundedUpLen);

	/* Make the header and compute the checksum if necessary. */
	switch (storageWrite->getBufferAoHeaderKind)
	{
		case AoHeaderKind_SmallContent:
			AppendOnlyStorageFormat_MakeSmallContentHeader
				(header,
				 storageWrite->storageAttributes.checksum,
				 storageWrite->isFirstRowNumSet,
				 storageWrite->formatVersion,
				 storageWrite->firstRowNum,
				 executorBlockKind,
				 itemCount,
				 sourceLen,
				 *compressedLen);
			break;

		case AoHeaderKind_BulkDenseContent:
			/*
			 * This header is used when compresslevel > 1 is specified with
			 * rle_type compression.  The sourceData is already encoded with
			 * RLE.  It is further compressed with bulk compression.
			 * Corresponding datumstream version is
			 * DatumStreamVersion_Dense_Enhanced.
			 */
			AppendOnlyStorageFormat_MakeBulkDenseContentHeader
				(header,
				 storageWrite->storageAttributes.checksum,
				 storageWrite->isFirstRowNumSet,
				 storageWrite->formatVersion,
				 storageWrite->firstRowNum,
				 executorBlockKind,
				 itemCount,
				 sourceLen,
				 *compressedLen);
			break;

		default:
			elog(ERROR, "unexpected Append-Only header kind %d",
				 storageWrite->getBufferAoHeaderKind);
			break;
	}

	if (Debug_appendonly_print_storage_headers)
	{
		AppendOnlyStorageWrite_LogBlockHeader(
			storageWrite,
			BufferedAppendCurrentBufferPosition(&storageWrite->bufferedAppend),
			header);
	}

	elogif(Debug_appendonly_print_insert, LOG,
		   "Append-only insert block for table '%s' stored %s "
		   "(segment file '%s', header offset in file " INT64_FORMAT ", "
		   "source length = %d, result length %d item count %d, block count "
		   INT64_FORMAT ")",
		   storageWrite->relationName,
		   (*compressedLen > 0) ? "compressed" : "uncompressed",
		   storageWrite->segmentFileName,
		   BufferedAppendCurrentBufferPosition(&storageWrite->bufferedAppend),
		   sourceLen,
		   dataLen,
		   itemCount,
		   storageWrite->bufferCount);

	*bufferLen = storageWrite->currentCompleteHeaderLen + dataRoundedUpLen;
}

/*
 * Mark the current buffer "small" buffer as finished.
 *
 * If compression is configured, we will try to compress the contents in
 * the temporary uncompressed buffer into the write buffer.
 *
 * The buffer can be scheduled for writing and reused.
 *
 * contentLen		- byte length of the content generated directly into the
 *					  buffer returned by AppendOnlyStorageWrite_GetBuffer.
 * executorBlockKind - A value defined externally by the executor that
 *					   describes in content stored in the Append-Only Storage
 *					   Block.
 * rowCount			-  number of rows stored in the content.
 */
void
AppendOnlyStorageWrite_FinishBuffer(AppendOnlyStorageWrite *storageWrite,
									int32 contentLen,
									int executorBlockKind,
									int rowCount)
{
	int64		headerOffsetInFile;
	int32		bufferLen;

	Assert(storageWrite != NULL);
	Assert(storageWrite->isActive);

	Assert(storageWrite->currentCompleteHeaderLen > 0);

	if (contentLen >
		storageWrite->maxBufferLen - storageWrite->currentCompleteHeaderLen)
		elog(ERROR,
			 "Append-only content too large AO storage block (table '%s', "
			 "content length = %d, maximum buffer length %d, complete header length %d, first row number is set %s)",
			 storageWrite->relationName,
			 contentLen,
			 storageWrite->maxBufferLen,
			 storageWrite->currentCompleteHeaderLen,
			 (storageWrite->isFirstRowNumSet ? "true" : "false"));


	headerOffsetInFile = BufferedAppendCurrentBufferPosition(&storageWrite->bufferedAppend);

	if (!storageWrite->storageAttributes.compress)
	{
		uint8	   *nonCompressedHeader;
		uint8	   *nonCompressedData;
		int32		dataRoundedUpLen;
		int32		uncompressedlen;

		nonCompressedHeader = storageWrite->currentBuffer;

		nonCompressedData = &nonCompressedHeader[storageWrite->currentCompleteHeaderLen];

		dataRoundedUpLen = AOStorage_RoundUp(contentLen, storageWrite->formatVersion);

		AOStorage_ZeroPad(
						  nonCompressedData,
						  contentLen,
						  dataRoundedUpLen);

		switch (storageWrite->getBufferAoHeaderKind)
		{
			case AoHeaderKind_SmallContent:

				/*
				 * Make the header and compute the checksum if necessary.
				 */
				AppendOnlyStorageFormat_MakeSmallContentHeader
					(nonCompressedHeader,
					 storageWrite->storageAttributes.checksum,
					 storageWrite->isFirstRowNumSet,
					 storageWrite->formatVersion,
					 storageWrite->firstRowNum,
					 executorBlockKind,
					 rowCount,
					 contentLen,
					  /* compressedLength */ 0);
				break;

			case AoHeaderKind_NonBulkDenseContent:

				/*
				 * Make the header and compute the checksum if necessary.
				 */
				AppendOnlyStorageFormat_MakeNonBulkDenseContentHeader
					(nonCompressedHeader,
					 storageWrite->storageAttributes.checksum,
					 storageWrite->isFirstRowNumSet,
					 storageWrite->formatVersion,
					 storageWrite->firstRowNum,
					 executorBlockKind,
					 rowCount,
					 contentLen);
				break;

			default:
				elog(ERROR, "Unexpected Append-Only header kind %d",
					 storageWrite->getBufferAoHeaderKind);
				break;
		}

		if (Debug_appendonly_print_storage_headers)
		{
			AppendOnlyStorageWrite_LogBlockHeader(storageWrite,
												  headerOffsetInFile,
												  nonCompressedHeader);
		}


		bufferLen = storageWrite->currentCompleteHeaderLen + dataRoundedUpLen;
		uncompressedlen = bufferLen;	/* since there's no compression.. */

		/*
		 * Just before finishing the AO Storage buffer with our non-compressed
		 * content, let's verify it.
		 */
		if (gp_appendonly_verify_write_block)
			AppendOnlyStorageWrite_VerifyWriteBlock(storageWrite,
													headerOffsetInFile,
													bufferLen,
													nonCompressedData,
													contentLen,
													executorBlockKind,
													rowCount,
													 /* expectedCompressedLen */ 0);

		BufferedAppendFinishBuffer(&storageWrite->bufferedAppend,
								   bufferLen,
								   uncompressedlen,
								   storageWrite->needsWAL);

		/* Declare it finished. */
		storageWrite->currentCompleteHeaderLen = 0;

		elogif(Debug_appendonly_print_insert, LOG,
			   "Append-only insert finished uncompressed block for table '%s' "
			   "(length = %d, executor block kind %d, item count %d, block count " INT64_FORMAT ")",
			   storageWrite->relationName,
			   contentLen,
			   executorBlockKind,
			   rowCount,
			   storageWrite->bufferCount);

	}
	else
	{
		int32		compressedLen = 0;

		AppendOnlyStorageWrite_CompressAppend(storageWrite,
											  storageWrite->uncompressedBuffer,
											  contentLen,
											  executorBlockKind,
											  rowCount,
											  &compressedLen,
											  &bufferLen);

		/*
		 * Just before finishing the AO Storage buffer with our non-compressed
		 * content, let's verify it.
		 */
		if (gp_appendonly_verify_write_block)
			AppendOnlyStorageWrite_VerifyWriteBlock(storageWrite,
													headerOffsetInFile,
													bufferLen,
													storageWrite->uncompressedBuffer,
													contentLen,
													executorBlockKind,
													rowCount,
													compressedLen);

		/*
		 * Finish the current buffer by specifying the used length.
		 */
		BufferedAppendFinishBuffer(&storageWrite->bufferedAppend,
								   bufferLen,
								   (storageWrite->currentCompleteHeaderLen +
								       AOStorage_RoundUp(contentLen, storageWrite->formatVersion) /* non-compressed size */ ),
								   storageWrite->needsWAL);
		/* Declare it finished. */
		storageWrite->currentCompleteHeaderLen = 0;
	}

	Assert(storageWrite->currentCompleteHeaderLen == 0);
	storageWrite->currentBuffer = NULL;
	storageWrite->isFirstRowNumSet = false;
}

/*
 * Cancel the last ~GetBuffer call.
 *
 * This will also turn off the firstRowNum flag.
 */
void
AppendOnlyStorageWrite_CancelLastBuffer(
										AppendOnlyStorageWrite *storageWrite)
{
	Assert(storageWrite != NULL);
	Assert(storageWrite->isActive);

	Assert(storageWrite->currentCompleteHeaderLen > 0);

	if (storageWrite->currentBuffer != NULL)
	{
		BufferedAppendCancelLastBuffer(&storageWrite->bufferedAppend);
		storageWrite->currentBuffer = NULL;
	}

	storageWrite->currentCompleteHeaderLen = 0;

	/*
	 * Since we don't know if AppendOnlyStorageWrite_Content will be called
	 * next or the writer is doing something else, let's turn off the
	 * firstRowNum flag.
	 */
	storageWrite->isFirstRowNumSet = false;
}

/*----------------------------------------------------------------
 * Writing "Large" Content
 *----------------------------------------------------------------
 */

/*
 * This section describes for writing long content that can be up to 1 Gb
 * long and/or content that will be bulk-compressed when configured.
 */


/*
 * Write content up to 1Gb.
 *
 * Large content will be writen in fragment blocks by the Append-Only
 * Storage Layer.
 *
 * If compression is configured, then the content will be compressed in
 * fragments.
 *
 * Returns NULL when the current file does not have enough room for another
 * buffer.
 *
 * content		- Content to store.  All contiguous.
 * contentLen	- byte length of the data to store.
 * executorBlockKind - a value defined externally by the executor that
 *					   describes in content stored in the Append-Only Storage
 *					   Block.
 * rowCount		- number of rows stored in the content.
 */
void
AppendOnlyStorageWrite_Content(AppendOnlyStorageWrite *storageWrite,
							   uint8 *content,
							   int32 contentLen,
							   int executorBlockKind,
							   int rowCount)
{
	int32		completeHeaderLen;
	int32		compressedLen;
	int32		bufferLen;
	uint8	   *data;

	Assert(storageWrite != NULL);
	Assert(storageWrite->isActive);

	completeHeaderLen =
		AppendOnlyStorageWrite_CompleteHeaderLen(storageWrite,
												 AoHeaderKind_SmallContent);
	if (contentLen <= storageWrite->maxBufferLen - completeHeaderLen)
	{
		/*
		 * This is "small" content.
		 */
		if (!storageWrite->storageAttributes.compress)
		{
			data = AppendOnlyStorageWrite_GetBuffer(storageWrite,
													AoHeaderKind_SmallContent);

			memcpy(data, content, contentLen);

			storageWrite->logicalBlockStartOffset =
				BufferedAppendNextBufferPosition(&(storageWrite->bufferedAppend));

			AppendOnlyStorageWrite_FinishBuffer(storageWrite,
												contentLen,
												executorBlockKind,
												rowCount);
			Assert(storageWrite->currentCompleteHeaderLen == 0);
		}
		else
		{
			/*
			 * Since ~_GetBuffer now takes in a specification of the header
			 * kind, we need to set the header kind so general routines like
			 * ~_CompressAppend will work correctly when writing the small
			 * "fragments
			 */
			storageWrite->getBufferAoHeaderKind = AoHeaderKind_SmallContent;
			AppendOnlyStorageWrite_CompressAppend(storageWrite,
												  content,
												  contentLen,
												  executorBlockKind,
												  rowCount,
												  &compressedLen,
												  &bufferLen);

			/*
			 * Just before finishing the AO Storage buffer with our
			 * non-compressed content, let's verify it.
			 */
			if (gp_appendonly_verify_write_block)
				AppendOnlyStorageWrite_VerifyWriteBlock(storageWrite,
														BufferedAppendCurrentBufferPosition(&storageWrite->bufferedAppend),
														bufferLen,
														content,
														contentLen,
														executorBlockKind,
														rowCount,
														compressedLen);

			storageWrite->logicalBlockStartOffset =
				BufferedAppendNextBufferPosition(&(storageWrite->bufferedAppend));

			/*
			 * Finish the current buffer by specifying the used length.
			 */
			BufferedAppendFinishBuffer(&storageWrite->bufferedAppend,
									   bufferLen,
									   (storageWrite->currentCompleteHeaderLen +
									       AOStorage_RoundUp(contentLen, storageWrite->formatVersion) /* non-compressed size */ ),
									   storageWrite->needsWAL);

			/* Declare it finished. */
			storageWrite->currentCompleteHeaderLen = 0;
		}
	}
	else
	{
		int32		largeContentHeaderLen;
		uint8	   *largeContentHeader;
		int32		smallContentHeaderLen;
		int32		maxSmallContentLen;
		int32		countdownContentLen;
		uint8	   *contentNext;
		int32		smallContentLen;

		/*
		 * Write the "Large" content in fragments.
		 */

		storageWrite->logicalBlockStartOffset =
			BufferedAppendNextBufferPosition(&(storageWrite->bufferedAppend));

		/*
		 * First is a LargeContent header-only block that has the "large"
		 * content length and "large" row count.
		 */
		largeContentHeaderLen = AppendOnlyStorageWrite_LargeContentHeaderLen(storageWrite);

		largeContentHeader =
			BufferedAppendGetBuffer(&storageWrite->bufferedAppend,
									largeContentHeaderLen);

		AppendOnlyStorageFormat_MakeLargeContentHeader(largeContentHeader,
													   storageWrite->storageAttributes.checksum,
													   storageWrite->isFirstRowNumSet,
													   storageWrite->formatVersion,
													   storageWrite->firstRowNum,
													   executorBlockKind,
													   rowCount,
													   contentLen);

		BufferedAppendFinishBuffer(&storageWrite->bufferedAppend,
								   largeContentHeaderLen,
								   largeContentHeaderLen,
								   storageWrite->needsWAL);

		/* Declare it finished. */
		storageWrite->currentCompleteHeaderLen = 0;

		/*
		 * Now write the fragments as type Block.
		 */
		storageWrite->isFirstRowNumSet = false; /* Not written with fragments. */

		smallContentHeaderLen =
			AppendOnlyStorageWrite_CompleteHeaderLen(storageWrite,
													 AoHeaderKind_SmallContent);
		maxSmallContentLen = storageWrite->maxBufferLen - smallContentHeaderLen;
		countdownContentLen = contentLen;
		contentNext = content;
		while (true)
		{
			if (countdownContentLen <= maxSmallContentLen)
				smallContentLen = countdownContentLen;
			else
				smallContentLen = maxSmallContentLen;

			if (!storageWrite->storageAttributes.compress)
			{
				data = AppendOnlyStorageWrite_GetBuffer(storageWrite,
														AoHeaderKind_SmallContent);

				memcpy(data, contentNext, smallContentLen);

				AppendOnlyStorageWrite_FinishBuffer(storageWrite,
													smallContentLen,
													executorBlockKind,
													 /* rowCount */ 0);
			}
			else
			{
				/*
				 * Since ~_GetBuffer now takes in a specification of the
				 * header kind, we need to set the header kind so general
				 * routines like ~_CompressAppend will work correctly when
				 * writing the small "fragments
				 */
				storageWrite->getBufferAoHeaderKind = AoHeaderKind_SmallContent;

				AppendOnlyStorageWrite_CompressAppend(storageWrite,
													  contentNext,
													  smallContentLen,
													  executorBlockKind,
													   /* rowCount */ 0,
													  &compressedLen,
													  &bufferLen);

				/*
				 * Just before finishing the AO Storage buffer with our
				 * non-compressed content, let's verify it.
				 */
				if (gp_appendonly_verify_write_block)
					AppendOnlyStorageWrite_VerifyWriteBlock(storageWrite,
															BufferedAppendCurrentBufferPosition(&storageWrite->bufferedAppend),
															bufferLen,
															contentNext,
															smallContentLen,
															executorBlockKind,
															 /* rowCount */ 0,
															compressedLen);

				/*
				 * Finish the current buffer by specifying the used length.
				 */
				BufferedAppendFinishBuffer(&storageWrite->bufferedAppend,
										   bufferLen,
										   (smallContentHeaderLen +
										   AOStorage_RoundUp(smallContentLen, storageWrite->formatVersion) /* non-compressed size */ ),
										   storageWrite->needsWAL);

				/* Declare it finished. */
				storageWrite->currentCompleteHeaderLen = 0;
			}

			countdownContentLen -= smallContentLen;
			if (countdownContentLen == 0)
				break;

			contentNext += smallContentLen;
		}
	}

	storageWrite->isFirstRowNumSet = false;

	/* Verify we have no buffer allocated. */
	Assert(storageWrite->currentCompleteHeaderLen == 0);
}

/*----------------------------------------------------------------
 * Optional: Set First Row Number
 *----------------------------------------------------------------
 */


/*
 * Normally, the first row of an Append-Only Storage Block is implicit. It
 * is the last row number of the previous block + 1. However, to support
 * BTree indexes that stored TIDs in shared-memory/disk before the
 * transaction commits, we may need to not reuse row numbers of aborted
 * transactions.  So, this routine tells the Append-Only Storage Layer to
 * explicitly keep the first row number. This will take up more header
 * overhead, so the AppendOnlyStorageWrite_HeaderLen routine should be
 * called afterwards to get the new overhead length.
 */


/*
 * Set the first row value for the next Append-Only Storage Block to be
 * written.  Only applies to the next block.
 */
void
AppendOnlyStorageWrite_SetFirstRowNum(AppendOnlyStorageWrite *storageWrite,
									  int64 firstRowNum)
{
	Assert(storageWrite != NULL);
	Assert(storageWrite->isActive);

	/* UNDONE: Range check firstRowNum */

	storageWrite->isFirstRowNumSet = true;
	storageWrite->firstRowNum = firstRowNum;
}

相关信息

greenplumn 源码目录

相关文章

greenplumn cdbappendonlystorageformat 源码

greenplumn cdbappendonlystorageread 源码

greenplumn cdbappendonlyxlog 源码

greenplumn cdbbufferedappend 源码

greenplumn cdbbufferedread 源码

greenplumn cdbcat 源码

greenplumn cdbcopy 源码

greenplumn cdbdistributedsnapshot 源码

greenplumn cdbdistributedxacts 源码

greenplumn cdbdistributedxid 源码

0  赞