greenplumn spgdoinsert 源码

  • 2022-08-18
  • 浏览 (430)

greenplumn spgdoinsert 代码

文件路径:/src/backend/access/spgist/spgdoinsert.c

/*-------------------------------------------------------------------------
 *
 * spgdoinsert.c
 *	  implementation of insert algorithm
 *
 *
 * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
 * Portions Copyright (c) 1994, Regents of the University of California
 *
 * IDENTIFICATION
 *			src/backend/access/spgist/spgdoinsert.c
 *
 *-------------------------------------------------------------------------
 */

#include "postgres.h"

#include "access/genam.h"
#include "access/spgist_private.h"
#include "access/spgxlog.h"
#include "access/xloginsert.h"
#include "miscadmin.h"
#include "storage/bufmgr.h"
#include "utils/rel.h"


/*
 * SPPageDesc tracks all info about a page we are inserting into.  In some
 * situations it actually identifies a tuple, or even a specific node within
 * an inner tuple.  But any of the fields can be invalid.  If the buffer
 * field is valid, it implies we hold pin and exclusive lock on that buffer.
 * page pointer should be valid exactly when buffer is.
 */
typedef struct SPPageDesc
{
	BlockNumber blkno;			/* block number, or InvalidBlockNumber */
	Buffer		buffer;			/* page's buffer number, or InvalidBuffer */
	Page		page;			/* pointer to page buffer, or NULL */
	OffsetNumber offnum;		/* offset of tuple, or InvalidOffsetNumber */
	int			node;			/* node number within inner tuple, or -1 */
} SPPageDesc;


/*
 * Set the item pointer in the nodeN'th entry in inner tuple tup.  This
 * is used to update the parent inner tuple's downlink after a move or
 * split operation.
 */
void
spgUpdateNodeLink(SpGistInnerTuple tup, int nodeN,
				  BlockNumber blkno, OffsetNumber offset)
{
	int			i;
	SpGistNodeTuple node;

	SGITITERATE(tup, i, node)
	{
		if (i == nodeN)
		{
			ItemPointerSet(&node->t_tid, blkno, offset);
			return;
		}
	}

	elog(ERROR, "failed to find requested node %d in SPGiST inner tuple",
		 nodeN);
}

/*
 * Form a new inner tuple containing one more node than the given one, with
 * the specified label datum, inserted at offset "offset" in the node array.
 * The new tuple's prefix is the same as the old one's.
 *
 * Note that the new node initially has an invalid downlink.  We'll find a
 * page to point it to later.
 */
static SpGistInnerTuple
addNode(SpGistState *state, SpGistInnerTuple tuple, Datum label, int offset)
{
	SpGistNodeTuple node,
			   *nodes;
	int			i;

	/* if offset is negative, insert at end */
	if (offset < 0)
		offset = tuple->nNodes;
	else if (offset > tuple->nNodes)
		elog(ERROR, "invalid offset for adding node to SPGiST inner tuple");

	nodes = palloc(sizeof(SpGistNodeTuple) * (tuple->nNodes + 1));
	SGITITERATE(tuple, i, node)
	{
		if (i < offset)
			nodes[i] = node;
		else
			nodes[i + 1] = node;
	}

	nodes[offset] = spgFormNodeTuple(state, label, false);

	return spgFormInnerTuple(state,
							 (tuple->prefixSize > 0),
							 SGITDATUM(tuple, state),
							 tuple->nNodes + 1,
							 nodes);
}

/* qsort comparator for sorting OffsetNumbers */
static int
cmpOffsetNumbers(const void *a, const void *b)
{
	if (*(const OffsetNumber *) a == *(const OffsetNumber *) b)
		return 0;
	return (*(const OffsetNumber *) a > *(const OffsetNumber *) b) ? 1 : -1;
}

/*
 * Delete multiple tuples from an index page, preserving tuple offset numbers.
 *
 * The first tuple in the given list is replaced with a dead tuple of type
 * "firststate" (REDIRECT/DEAD/PLACEHOLDER); the remaining tuples are replaced
 * with dead tuples of type "reststate".  If either firststate or reststate
 * is REDIRECT, blkno/offnum specify where to link to.
 *
 * NB: this is used during WAL replay, so beware of trying to make it too
 * smart.  In particular, it shouldn't use "state" except for calling
 * spgFormDeadTuple().  This is also used in a critical section, so no
 * pallocs either!
 */
void
spgPageIndexMultiDelete(SpGistState *state, Page page,
						OffsetNumber *itemnos, int nitems,
						int firststate, int reststate,
						BlockNumber blkno, OffsetNumber offnum)
{
	OffsetNumber firstItem;
	OffsetNumber sortednos[MaxIndexTuplesPerPage];
	SpGistDeadTuple tuple = NULL;
	int			i;

	if (nitems == 0)
		return;					/* nothing to do */

	/*
	 * For efficiency we want to use PageIndexMultiDelete, which requires the
	 * targets to be listed in sorted order, so we have to sort the itemnos
	 * array.  (This also greatly simplifies the math for reinserting the
	 * replacement tuples.)  However, we must not scribble on the caller's
	 * array, so we have to make a copy.
	 */
	memcpy(sortednos, itemnos, sizeof(OffsetNumber) * nitems);
	if (nitems > 1)
		qsort(sortednos, nitems, sizeof(OffsetNumber), cmpOffsetNumbers);

	PageIndexMultiDelete(page, sortednos, nitems);

	firstItem = itemnos[0];

	for (i = 0; i < nitems; i++)
	{
		OffsetNumber itemno = sortednos[i];
		int			tupstate;

		tupstate = (itemno == firstItem) ? firststate : reststate;
		if (tuple == NULL || tuple->tupstate != tupstate)
			tuple = spgFormDeadTuple(state, tupstate, blkno, offnum);

		if (PageAddItem(page, (Item) tuple, tuple->size,
						itemno, false, false) != itemno)
			elog(ERROR, "failed to add item of size %u to SPGiST index page",
				 tuple->size);

		if (tupstate == SPGIST_REDIRECT)
			SpGistPageGetOpaque(page)->nRedirection++;
		else if (tupstate == SPGIST_PLACEHOLDER)
			SpGistPageGetOpaque(page)->nPlaceholder++;
	}
}

/*
 * Update the parent inner tuple's downlink, and mark the parent buffer
 * dirty (this must be the last change to the parent page in the current
 * WAL action).
 */
static void
saveNodeLink(Relation index, SPPageDesc *parent,
			 BlockNumber blkno, OffsetNumber offnum)
{
	SpGistInnerTuple innerTuple;

	innerTuple = (SpGistInnerTuple) PageGetItem(parent->page,
												PageGetItemId(parent->page, parent->offnum));

	spgUpdateNodeLink(innerTuple, parent->node, blkno, offnum);

	MarkBufferDirty(parent->buffer);
}

/*
 * Add a leaf tuple to a leaf page where there is known to be room for it
 */
static void
addLeafTuple(Relation index, SpGistState *state, SpGistLeafTuple leafTuple,
			 SPPageDesc *current, SPPageDesc *parent, bool isNulls, bool isNew)
{
	spgxlogAddLeaf xlrec;

	xlrec.newPage = isNew;
	xlrec.storesNulls = isNulls;

	/* these will be filled below as needed */
	xlrec.offnumLeaf = InvalidOffsetNumber;
	xlrec.offnumHeadLeaf = InvalidOffsetNumber;
	xlrec.offnumParent = InvalidOffsetNumber;
	xlrec.nodeI = 0;

	START_CRIT_SECTION();

	if (current->offnum == InvalidOffsetNumber ||
		SpGistBlockIsRoot(current->blkno))
	{
		/* Tuple is not part of a chain */
		leafTuple->nextOffset = InvalidOffsetNumber;
		current->offnum = SpGistPageAddNewItem(state, current->page,
											   (Item) leafTuple, leafTuple->size,
											   NULL, false);

		xlrec.offnumLeaf = current->offnum;

		/* Must update parent's downlink if any */
		if (parent->buffer != InvalidBuffer)
		{
			xlrec.offnumParent = parent->offnum;
			xlrec.nodeI = parent->node;

			saveNodeLink(index, parent, current->blkno, current->offnum);
		}
	}
	else
	{
		/*
		 * Tuple must be inserted into existing chain.  We mustn't change the
		 * chain's head address, but we don't need to chase the entire chain
		 * to put the tuple at the end; we can insert it second.
		 *
		 * Also, it's possible that the "chain" consists only of a DEAD tuple,
		 * in which case we should replace the DEAD tuple in-place.
		 */
		SpGistLeafTuple head;
		OffsetNumber offnum;

		head = (SpGistLeafTuple) PageGetItem(current->page,
											 PageGetItemId(current->page, current->offnum));
		if (head->tupstate == SPGIST_LIVE)
		{
			leafTuple->nextOffset = head->nextOffset;
			offnum = SpGistPageAddNewItem(state, current->page,
										  (Item) leafTuple, leafTuple->size,
										  NULL, false);

			/*
			 * re-get head of list because it could have been moved on page,
			 * and set new second element
			 */
			head = (SpGistLeafTuple) PageGetItem(current->page,
												 PageGetItemId(current->page, current->offnum));
			head->nextOffset = offnum;

			xlrec.offnumLeaf = offnum;
			xlrec.offnumHeadLeaf = current->offnum;
		}
		else if (head->tupstate == SPGIST_DEAD)
		{
			leafTuple->nextOffset = InvalidOffsetNumber;
			PageIndexTupleDelete(current->page, current->offnum);
			if (PageAddItem(current->page,
							(Item) leafTuple, leafTuple->size,
							current->offnum, false, false) != current->offnum)
				elog(ERROR, "failed to add item of size %u to SPGiST index page",
					 leafTuple->size);

			/* WAL replay distinguishes this case by equal offnums */
			xlrec.offnumLeaf = current->offnum;
			xlrec.offnumHeadLeaf = current->offnum;
		}
		else
			elog(ERROR, "unexpected SPGiST tuple state: %d", head->tupstate);
	}

	MarkBufferDirty(current->buffer);

	if (RelationNeedsWAL(index) && !state->isBuild)
	{
		XLogRecPtr	recptr;
		int			flags;

		XLogBeginInsert();
		XLogRegisterData((char *) &xlrec, sizeof(xlrec));
		XLogRegisterData((char *) leafTuple, leafTuple->size);

		flags = REGBUF_STANDARD;
		if (xlrec.newPage)
			flags |= REGBUF_WILL_INIT;
		XLogRegisterBuffer(0, current->buffer, flags);
		if (xlrec.offnumParent != InvalidOffsetNumber)
			XLogRegisterBuffer(1, parent->buffer, REGBUF_STANDARD);

		recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_LEAF);

		PageSetLSN(current->page, recptr);

		/* update parent only if we actually changed it */
		if (xlrec.offnumParent != InvalidOffsetNumber)
		{
			PageSetLSN(parent->page, recptr);
		}
	}

	END_CRIT_SECTION();
}

/*
 * Count the number and total size of leaf tuples in the chain starting at
 * current->offnum.  Return number into *nToSplit and total size as function
 * result.
 *
 * Klugy special case when considering the root page (i.e., root is a leaf
 * page, but we're about to split for the first time): return fake large
 * values to force spgdoinsert() to take the doPickSplit rather than
 * moveLeafs code path.  moveLeafs is not prepared to deal with root page.
 */
static int
checkSplitConditions(Relation index, SpGistState *state,
					 SPPageDesc *current, int *nToSplit)
{
	int			i,
				n = 0,
				totalSize = 0;

	if (SpGistBlockIsRoot(current->blkno))
	{
		/* return impossible values to force split */
		*nToSplit = BLCKSZ;
		return BLCKSZ;
	}

	i = current->offnum;
	while (i != InvalidOffsetNumber)
	{
		SpGistLeafTuple it;

		Assert(i >= FirstOffsetNumber &&
			   i <= PageGetMaxOffsetNumber(current->page));
		it = (SpGistLeafTuple) PageGetItem(current->page,
										   PageGetItemId(current->page, i));
		if (it->tupstate == SPGIST_LIVE)
		{
			n++;
			totalSize += it->size + sizeof(ItemIdData);
		}
		else if (it->tupstate == SPGIST_DEAD)
		{
			/* We could see a DEAD tuple as first/only chain item */
			Assert(i == current->offnum);
			Assert(it->nextOffset == InvalidOffsetNumber);
			/* Don't count it in result, because it won't go to other page */
		}
		else
			elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);

		i = it->nextOffset;
	}

	*nToSplit = n;

	return totalSize;
}

/*
 * current points to a leaf-tuple chain that we wanted to add newLeafTuple to,
 * but the chain has to be moved because there's not enough room to add
 * newLeafTuple to its page.  We use this method when the chain contains
 * very little data so a split would be inefficient.  We are sure we can
 * fit the chain plus newLeafTuple on one other page.
 */
static void
moveLeafs(Relation index, SpGistState *state,
		  SPPageDesc *current, SPPageDesc *parent,
		  SpGistLeafTuple newLeafTuple, bool isNulls)
{
	int			i,
				nDelete,
				nInsert,
				size;
	Buffer		nbuf;
	Page		npage;
	SpGistLeafTuple it;
	OffsetNumber r = InvalidOffsetNumber,
				startOffset = InvalidOffsetNumber;
	bool		replaceDead = false;
	OffsetNumber *toDelete;
	OffsetNumber *toInsert;
	BlockNumber nblkno;
	spgxlogMoveLeafs xlrec;
	char	   *leafdata,
			   *leafptr;

	/* This doesn't work on root page */
	Assert(parent->buffer != InvalidBuffer);
	Assert(parent->buffer != current->buffer);

	/* Locate the tuples to be moved, and count up the space needed */
	i = PageGetMaxOffsetNumber(current->page);
	toDelete = (OffsetNumber *) palloc(sizeof(OffsetNumber) * i);
	toInsert = (OffsetNumber *) palloc(sizeof(OffsetNumber) * (i + 1));

	size = newLeafTuple->size + sizeof(ItemIdData);

	nDelete = 0;
	i = current->offnum;
	while (i != InvalidOffsetNumber)
	{
		SpGistLeafTuple it;

		Assert(i >= FirstOffsetNumber &&
			   i <= PageGetMaxOffsetNumber(current->page));
		it = (SpGistLeafTuple) PageGetItem(current->page,
										   PageGetItemId(current->page, i));

		if (it->tupstate == SPGIST_LIVE)
		{
			toDelete[nDelete] = i;
			size += it->size + sizeof(ItemIdData);
			nDelete++;
		}
		else if (it->tupstate == SPGIST_DEAD)
		{
			/* We could see a DEAD tuple as first/only chain item */
			Assert(i == current->offnum);
			Assert(it->nextOffset == InvalidOffsetNumber);
			/* We don't want to move it, so don't count it in size */
			toDelete[nDelete] = i;
			nDelete++;
			replaceDead = true;
		}
		else
			elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);

		i = it->nextOffset;
	}

	/* Find a leaf page that will hold them */
	nbuf = SpGistGetBuffer(index, GBUF_LEAF | (isNulls ? GBUF_NULLS : 0),
						   size, &xlrec.newPage);
	npage = BufferGetPage(nbuf);
	nblkno = BufferGetBlockNumber(nbuf);
	Assert(nblkno != current->blkno);

	leafdata = leafptr = palloc(size);

	START_CRIT_SECTION();

	/* copy all the old tuples to new page, unless they're dead */
	nInsert = 0;
	if (!replaceDead)
	{
		for (i = 0; i < nDelete; i++)
		{
			it = (SpGistLeafTuple) PageGetItem(current->page,
											   PageGetItemId(current->page, toDelete[i]));
			Assert(it->tupstate == SPGIST_LIVE);

			/*
			 * Update chain link (notice the chain order gets reversed, but we
			 * don't care).  We're modifying the tuple on the source page
			 * here, but it's okay since we're about to delete it.
			 */
			it->nextOffset = r;

			r = SpGistPageAddNewItem(state, npage, (Item) it, it->size,
									 &startOffset, false);

			toInsert[nInsert] = r;
			nInsert++;

			/* save modified tuple into leafdata as well */
			memcpy(leafptr, it, it->size);
			leafptr += it->size;
		}
	}

	/* add the new tuple as well */
	newLeafTuple->nextOffset = r;
	r = SpGistPageAddNewItem(state, npage,
							 (Item) newLeafTuple, newLeafTuple->size,
							 &startOffset, false);
	toInsert[nInsert] = r;
	nInsert++;
	memcpy(leafptr, newLeafTuple, newLeafTuple->size);
	leafptr += newLeafTuple->size;

	/*
	 * Now delete the old tuples, leaving a redirection pointer behind for the
	 * first one, unless we're doing an index build; in which case there can't
	 * be any concurrent scan so we need not provide a redirect.
	 */
	spgPageIndexMultiDelete(state, current->page, toDelete, nDelete,
							state->isBuild ? SPGIST_PLACEHOLDER : SPGIST_REDIRECT,
							SPGIST_PLACEHOLDER,
							nblkno, r);

	/* Update parent's downlink and mark parent page dirty */
	saveNodeLink(index, parent, nblkno, r);

	/* Mark the leaf pages too */
	MarkBufferDirty(current->buffer);
	MarkBufferDirty(nbuf);

	if (RelationNeedsWAL(index) && !state->isBuild)
	{
		XLogRecPtr	recptr;

		/* prepare WAL info */
		STORE_STATE(state, xlrec.stateSrc);

		xlrec.nMoves = nDelete;
		xlrec.replaceDead = replaceDead;
		xlrec.storesNulls = isNulls;

		xlrec.offnumParent = parent->offnum;
		xlrec.nodeI = parent->node;

		XLogBeginInsert();
		XLogRegisterData((char *) &xlrec, SizeOfSpgxlogMoveLeafs);
		XLogRegisterData((char *) toDelete,
						 sizeof(OffsetNumber) * nDelete);
		XLogRegisterData((char *) toInsert,
						 sizeof(OffsetNumber) * nInsert);
		XLogRegisterData((char *) leafdata, leafptr - leafdata);

		XLogRegisterBuffer(0, current->buffer, REGBUF_STANDARD);
		XLogRegisterBuffer(1, nbuf, REGBUF_STANDARD | (xlrec.newPage ? REGBUF_WILL_INIT : 0));
		XLogRegisterBuffer(2, parent->buffer, REGBUF_STANDARD);

		recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_MOVE_LEAFS);

		PageSetLSN(current->page, recptr);
		PageSetLSN(npage, recptr);
		PageSetLSN(parent->page, recptr);
	}

	END_CRIT_SECTION();

	/* Update local free-space cache and release new buffer */
	SpGistSetLastUsedPage(index, nbuf);
	UnlockReleaseBuffer(nbuf);
}

/*
 * Update previously-created redirection tuple with appropriate destination
 *
 * We use this when it's not convenient to know the destination first.
 * The tuple should have been made with the "impossible" destination of
 * the metapage.
 */
static void
setRedirectionTuple(SPPageDesc *current, OffsetNumber position,
					BlockNumber blkno, OffsetNumber offnum)
{
	SpGistDeadTuple dt;

	dt = (SpGistDeadTuple) PageGetItem(current->page,
									   PageGetItemId(current->page, position));
	Assert(dt->tupstate == SPGIST_REDIRECT);
	Assert(ItemPointerGetBlockNumber(&dt->pointer) == SPGIST_METAPAGE_BLKNO);
	ItemPointerSet(&dt->pointer, blkno, offnum);
}

/*
 * Test to see if the user-defined picksplit function failed to do its job,
 * ie, it put all the leaf tuples into the same node.
 * If so, randomly divide the tuples into several nodes (all with the same
 * label) and return true to select allTheSame mode for this inner tuple.
 *
 * (This code is also used to forcibly select allTheSame mode for nulls.)
 *
 * If we know that the leaf tuples wouldn't all fit on one page, then we
 * exclude the last tuple (which is the incoming new tuple that forced a split)
 * from the check to see if more than one node is used.  The reason for this
 * is that if the existing tuples are put into only one chain, then even if
 * we move them all to an empty page, there would still not be room for the
 * new tuple, so we'd get into an infinite loop of picksplit attempts.
 * Forcing allTheSame mode dodges this problem by ensuring the old tuples will
 * be split across pages.  (Exercise for the reader: figure out why this
 * fixes the problem even when there is only one old tuple.)
 */
static bool
checkAllTheSame(spgPickSplitIn *in, spgPickSplitOut *out, bool tooBig,
				bool *includeNew)
{
	int			theNode;
	int			limit;
	int			i;

	/* For the moment, assume we can include the new leaf tuple */
	*includeNew = true;

	/* If there's only the new leaf tuple, don't select allTheSame mode */
	if (in->nTuples <= 1)
		return false;

	/* If tuple set doesn't fit on one page, ignore the new tuple in test */
	limit = tooBig ? in->nTuples - 1 : in->nTuples;

	/* Check to see if more than one node is populated */
	theNode = out->mapTuplesToNodes[0];
	for (i = 1; i < limit; i++)
	{
		if (out->mapTuplesToNodes[i] != theNode)
			return false;
	}

	/* Nope, so override the picksplit function's decisions */

	/* If the new tuple is in its own node, it can't be included in split */
	if (tooBig && out->mapTuplesToNodes[in->nTuples - 1] != theNode)
		*includeNew = false;

	out->nNodes = 8;			/* arbitrary number of child nodes */

	/* Random assignment of tuples to nodes (note we include new tuple) */
	for (i = 0; i < in->nTuples; i++)
		out->mapTuplesToNodes[i] = i % out->nNodes;

	/* The opclass may not use node labels, but if it does, duplicate 'em */
	if (out->nodeLabels)
	{
		Datum		theLabel = out->nodeLabels[theNode];

		out->nodeLabels = (Datum *) palloc(sizeof(Datum) * out->nNodes);
		for (i = 0; i < out->nNodes; i++)
			out->nodeLabels[i] = theLabel;
	}

	/* We don't touch the prefix or the leaf tuple datum assignments */

	return true;
}

/*
 * current points to a leaf-tuple chain that we wanted to add newLeafTuple to,
 * but the chain has to be split because there's not enough room to add
 * newLeafTuple to its page.
 *
 * This function splits the leaf tuple set according to picksplit's rules,
 * creating one or more new chains that are spread across the current page
 * and an additional leaf page (we assume that two leaf pages will be
 * sufficient).  A new inner tuple is created, and the parent downlink
 * pointer is updated to point to that inner tuple instead of the leaf chain.
 *
 * On exit, current contains the address of the new inner tuple.
 *
 * Returns true if we successfully inserted newLeafTuple during this function,
 * false if caller still has to do it (meaning another picksplit operation is
 * probably needed).  Failure could occur if the picksplit result is fairly
 * unbalanced, or if newLeafTuple is just plain too big to fit on a page.
 * Because we force the picksplit result to be at least two chains, each
 * cycle will get rid of at least one leaf tuple from the chain, so the loop
 * will eventually terminate if lack of balance is the issue.  If the tuple
 * is too big, we assume that repeated picksplit operations will eventually
 * make it small enough by repeated prefix-stripping.  A broken opclass could
 * make this an infinite loop, though.
 */
static bool
doPickSplit(Relation index, SpGistState *state,
			SPPageDesc *current, SPPageDesc *parent,
			SpGistLeafTuple newLeafTuple,
			int level, bool isNulls, bool isNew)
{
	bool		insertedNew = false;
	spgPickSplitIn in;
	spgPickSplitOut out;
	FmgrInfo   *procinfo;
	bool		includeNew;
	int			i,
				max,
				n;
	SpGistInnerTuple innerTuple;
	SpGistNodeTuple node,
			   *nodes;
	Buffer		newInnerBuffer,
				newLeafBuffer;
	ItemPointerData *heapPtrs;
	uint8	   *leafPageSelect;
	int		   *leafSizes;
	OffsetNumber *toDelete;
	OffsetNumber *toInsert;
	OffsetNumber redirectTuplePos = InvalidOffsetNumber;
	OffsetNumber startOffsets[2];
	SpGistLeafTuple *newLeafs;
	int			spaceToDelete;
	int			currentFreeSpace;
	int			totalLeafSizes;
	bool		allTheSame;
	spgxlogPickSplit xlrec;
	char	   *leafdata,
			   *leafptr;
	SPPageDesc	saveCurrent;
	int			nToDelete,
				nToInsert,
				maxToInclude;

	in.level = level;

	/*
	 * Allocate per-leaf-tuple work arrays with max possible size
	 */
	max = PageGetMaxOffsetNumber(current->page);
	n = max + 1;
	in.datums = (Datum *) palloc(sizeof(Datum) * n);
	heapPtrs = (ItemPointerData *) palloc(sizeof(ItemPointerData) * n);
	toDelete = (OffsetNumber *) palloc(sizeof(OffsetNumber) * n);
	toInsert = (OffsetNumber *) palloc(sizeof(OffsetNumber) * n);
	newLeafs = (SpGistLeafTuple *) palloc(sizeof(SpGistLeafTuple) * n);
	leafPageSelect = (uint8 *) palloc(sizeof(uint8) * n);

	STORE_STATE(state, xlrec.stateSrc);

	/*
	 * Form list of leaf tuples which will be distributed as split result;
	 * also, count up the amount of space that will be freed from current.
	 * (Note that in the non-root case, we won't actually delete the old
	 * tuples, only replace them with redirects or placeholders.)
	 *
	 * Note: the SGLTDATUM calls here are safe even when dealing with a nulls
	 * page.  For a pass-by-value data type we will fetch a word that must
	 * exist even though it may contain garbage (because of the fact that leaf
	 * tuples must have size at least SGDTSIZE).  For a pass-by-reference type
	 * we are just computing a pointer that isn't going to get dereferenced.
	 * So it's not worth guarding the calls with isNulls checks.
	 */
	nToInsert = 0;
	nToDelete = 0;
	spaceToDelete = 0;
	if (SpGistBlockIsRoot(current->blkno))
	{
		/*
		 * We are splitting the root (which up to now is also a leaf page).
		 * Its tuples are not linked, so scan sequentially to get them all. We
		 * ignore the original value of current->offnum.
		 */
		for (i = FirstOffsetNumber; i <= max; i++)
		{
			SpGistLeafTuple it;

			it = (SpGistLeafTuple) PageGetItem(current->page,
											   PageGetItemId(current->page, i));
			if (it->tupstate == SPGIST_LIVE)
			{
				in.datums[nToInsert] = SGLTDATUM(it, state);
				heapPtrs[nToInsert] = it->heapPtr;
				nToInsert++;
				toDelete[nToDelete] = i;
				nToDelete++;
				/* we will delete the tuple altogether, so count full space */
				spaceToDelete += it->size + sizeof(ItemIdData);
			}
			else				/* tuples on root should be live */
				elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
		}
	}
	else
	{
		/* Normal case, just collect the leaf tuples in the chain */
		i = current->offnum;
		while (i != InvalidOffsetNumber)
		{
			SpGistLeafTuple it;

			Assert(i >= FirstOffsetNumber && i <= max);
			it = (SpGistLeafTuple) PageGetItem(current->page,
											   PageGetItemId(current->page, i));
			if (it->tupstate == SPGIST_LIVE)
			{
				in.datums[nToInsert] = SGLTDATUM(it, state);
				heapPtrs[nToInsert] = it->heapPtr;
				nToInsert++;
				toDelete[nToDelete] = i;
				nToDelete++;
				/* we will not delete the tuple, only replace with dead */
				Assert(it->size >= SGDTSIZE);
				spaceToDelete += it->size - SGDTSIZE;
			}
			else if (it->tupstate == SPGIST_DEAD)
			{
				/* We could see a DEAD tuple as first/only chain item */
				Assert(i == current->offnum);
				Assert(it->nextOffset == InvalidOffsetNumber);
				toDelete[nToDelete] = i;
				nToDelete++;
				/* replacing it with redirect will save no space */
			}
			else
				elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);

			i = it->nextOffset;
		}
	}
	in.nTuples = nToInsert;

	/*
	 * We may not actually insert new tuple because another picksplit may be
	 * necessary due to too large value, but we will try to allocate enough
	 * space to include it; and in any case it has to be included in the input
	 * for the picksplit function.  So don't increment nToInsert yet.
	 */
	in.datums[in.nTuples] = SGLTDATUM(newLeafTuple, state);
	heapPtrs[in.nTuples] = newLeafTuple->heapPtr;
	in.nTuples++;

	memset(&out, 0, sizeof(out));

	if (!isNulls)
	{
		/*
		 * Perform split using user-defined method.
		 */
		procinfo = index_getprocinfo(index, 1, SPGIST_PICKSPLIT_PROC);
		FunctionCall2Coll(procinfo,
						  index->rd_indcollation[0],
						  PointerGetDatum(&in),
						  PointerGetDatum(&out));

		/*
		 * Form new leaf tuples and count up the total space needed.
		 */
		totalLeafSizes = 0;
		for (i = 0; i < in.nTuples; i++)
		{
			newLeafs[i] = spgFormLeafTuple(state, heapPtrs + i,
										   out.leafTupleDatums[i],
										   false);
			totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData);
		}
	}
	else
	{
		/*
		 * Perform dummy split that puts all tuples into one node.
		 * checkAllTheSame will override this and force allTheSame mode.
		 */
		out.hasPrefix = false;
		out.nNodes = 1;
		out.nodeLabels = NULL;
		out.mapTuplesToNodes = palloc0(sizeof(int) * in.nTuples);

		/*
		 * Form new leaf tuples and count up the total space needed.
		 */
		totalLeafSizes = 0;
		for (i = 0; i < in.nTuples; i++)
		{
			newLeafs[i] = spgFormLeafTuple(state, heapPtrs + i,
										   (Datum) 0,
										   true);
			totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData);
		}
	}

	/*
	 * Check to see if the picksplit function failed to separate the values,
	 * ie, it put them all into the same child node.  If so, select allTheSame
	 * mode and create a random split instead.  See comments for
	 * checkAllTheSame as to why we need to know if the new leaf tuples could
	 * fit on one page.
	 */
	allTheSame = checkAllTheSame(&in, &out,
								 totalLeafSizes > SPGIST_PAGE_CAPACITY,
								 &includeNew);

	/*
	 * If checkAllTheSame decided we must exclude the new tuple, don't
	 * consider it any further.
	 */
	if (includeNew)
		maxToInclude = in.nTuples;
	else
	{
		maxToInclude = in.nTuples - 1;
		totalLeafSizes -= newLeafs[in.nTuples - 1]->size + sizeof(ItemIdData);
	}

	/*
	 * Allocate per-node work arrays.  Since checkAllTheSame could replace
	 * out.nNodes with a value larger than the number of tuples on the input
	 * page, we can't allocate these arrays before here.
	 */
	nodes = (SpGistNodeTuple *) palloc(sizeof(SpGistNodeTuple) * out.nNodes);
	leafSizes = (int *) palloc0(sizeof(int) * out.nNodes);

	/*
	 * Form nodes of inner tuple and inner tuple itself
	 */
	for (i = 0; i < out.nNodes; i++)
	{
		Datum		label = (Datum) 0;
		bool		labelisnull = (out.nodeLabels == NULL);

		if (!labelisnull)
			label = out.nodeLabels[i];
		nodes[i] = spgFormNodeTuple(state, label, labelisnull);
	}
	innerTuple = spgFormInnerTuple(state,
								   out.hasPrefix, out.prefixDatum,
								   out.nNodes, nodes);
	innerTuple->allTheSame = allTheSame;

	/*
	 * Update nodes[] array to point into the newly formed innerTuple, so that
	 * we can adjust their downlinks below.
	 */
	SGITITERATE(innerTuple, i, node)
	{
		nodes[i] = node;
	}

	/*
	 * Re-scan new leaf tuples and count up the space needed under each node.
	 */
	for (i = 0; i < maxToInclude; i++)
	{
		n = out.mapTuplesToNodes[i];
		if (n < 0 || n >= out.nNodes)
			elog(ERROR, "inconsistent result of SPGiST picksplit function");
		leafSizes[n] += newLeafs[i]->size + sizeof(ItemIdData);
	}

	/*
	 * To perform the split, we must insert a new inner tuple, which can't go
	 * on a leaf page; and unless we are splitting the root page, we must then
	 * update the parent tuple's downlink to point to the inner tuple.  If
	 * there is room, we'll put the new inner tuple on the same page as the
	 * parent tuple, otherwise we need another non-leaf buffer. But if the
	 * parent page is the root, we can't add the new inner tuple there,
	 * because the root page must have only one inner tuple.
	 */
	xlrec.initInner = false;
	if (parent->buffer != InvalidBuffer &&
		!SpGistBlockIsRoot(parent->blkno) &&
		(SpGistPageGetFreeSpace(parent->page, 1) >=
		 innerTuple->size + sizeof(ItemIdData)))
	{
		/* New inner tuple will fit on parent page */
		newInnerBuffer = parent->buffer;
	}
	else if (parent->buffer != InvalidBuffer)
	{
		/* Send tuple to page with next triple parity (see README) */
		newInnerBuffer = SpGistGetBuffer(index,
										 GBUF_INNER_PARITY(parent->blkno + 1) |
										 (isNulls ? GBUF_NULLS : 0),
										 innerTuple->size + sizeof(ItemIdData),
										 &xlrec.initInner);
	}
	else
	{
		/* Root page split ... inner tuple will go to root page */
		newInnerBuffer = InvalidBuffer;
	}

	/*
	 * The new leaf tuples converted from the existing ones should require the
	 * same or less space, and therefore should all fit onto one page
	 * (although that's not necessarily the current page, since we can't
	 * delete the old tuples but only replace them with placeholders).
	 * However, the incoming new tuple might not also fit, in which case we
	 * might need another picksplit cycle to reduce it some more.
	 *
	 * If there's not room to put everything back onto the current page, then
	 * we decide on a per-node basis which tuples go to the new page. (We do
	 * it like that because leaf tuple chains can't cross pages, so we must
	 * place all leaf tuples belonging to the same parent node on the same
	 * page.)
	 *
	 * If we are splitting the root page (turning it from a leaf page into an
	 * inner page), then no leaf tuples can go back to the current page; they
	 * must all go somewhere else.
	 */
	if (!SpGistBlockIsRoot(current->blkno))
		currentFreeSpace = PageGetExactFreeSpace(current->page) + spaceToDelete;
	else
		currentFreeSpace = 0;	/* prevent assigning any tuples to current */

	xlrec.initDest = false;

	if (totalLeafSizes <= currentFreeSpace)
	{
		/* All the leaf tuples will fit on current page */
		newLeafBuffer = InvalidBuffer;
		/* mark new leaf tuple as included in insertions, if allowed */
		if (includeNew)
		{
			nToInsert++;
			insertedNew = true;
		}
		for (i = 0; i < nToInsert; i++)
			leafPageSelect[i] = 0;	/* signifies current page */
	}
	else if (in.nTuples == 1 && totalLeafSizes > SPGIST_PAGE_CAPACITY)
	{
		/*
		 * We're trying to split up a long value by repeated suffixing, but
		 * it's not going to fit yet.  Don't bother allocating a second leaf
		 * buffer that we won't be able to use.
		 */
		newLeafBuffer = InvalidBuffer;
		Assert(includeNew);
		Assert(nToInsert == 0);
	}
	else
	{
		/* We will need another leaf page */
		uint8	   *nodePageSelect;
		int			curspace;
		int			newspace;

		newLeafBuffer = SpGistGetBuffer(index,
										GBUF_LEAF | (isNulls ? GBUF_NULLS : 0),
										Min(totalLeafSizes,
											SPGIST_PAGE_CAPACITY),
										&xlrec.initDest);

		/*
		 * Attempt to assign node groups to the two pages.  We might fail to
		 * do so, even if totalLeafSizes is less than the available space,
		 * because we can't split a group across pages.
		 */
		nodePageSelect = (uint8 *) palloc(sizeof(uint8) * out.nNodes);

		curspace = currentFreeSpace;
		newspace = PageGetExactFreeSpace(BufferGetPage(newLeafBuffer));
		for (i = 0; i < out.nNodes; i++)
		{
			if (leafSizes[i] <= curspace)
			{
				nodePageSelect[i] = 0;	/* signifies current page */
				curspace -= leafSizes[i];
			}
			else
			{
				nodePageSelect[i] = 1;	/* signifies new leaf page */
				newspace -= leafSizes[i];
			}
		}
		if (curspace >= 0 && newspace >= 0)
		{
			/* Successful assignment, so we can include the new leaf tuple */
			if (includeNew)
			{
				nToInsert++;
				insertedNew = true;
			}
		}
		else if (includeNew)
		{
			/* We must exclude the new leaf tuple from the split */
			int			nodeOfNewTuple = out.mapTuplesToNodes[in.nTuples - 1];

			leafSizes[nodeOfNewTuple] -=
				newLeafs[in.nTuples - 1]->size + sizeof(ItemIdData);

			/* Repeat the node assignment process --- should succeed now */
			curspace = currentFreeSpace;
			newspace = PageGetExactFreeSpace(BufferGetPage(newLeafBuffer));
			for (i = 0; i < out.nNodes; i++)
			{
				if (leafSizes[i] <= curspace)
				{
					nodePageSelect[i] = 0;	/* signifies current page */
					curspace -= leafSizes[i];
				}
				else
				{
					nodePageSelect[i] = 1;	/* signifies new leaf page */
					newspace -= leafSizes[i];
				}
			}
			if (curspace < 0 || newspace < 0)
				elog(ERROR, "failed to divide leaf tuple groups across pages");
		}
		else
		{
			/* oops, we already excluded new tuple ... should not get here */
			elog(ERROR, "failed to divide leaf tuple groups across pages");
		}
		/* Expand the per-node assignments to be shown per leaf tuple */
		for (i = 0; i < nToInsert; i++)
		{
			n = out.mapTuplesToNodes[i];
			leafPageSelect[i] = nodePageSelect[n];
		}
	}

	/* Start preparing WAL record */
	xlrec.nDelete = 0;
	xlrec.initSrc = isNew;
	xlrec.storesNulls = isNulls;
	xlrec.isRootSplit = SpGistBlockIsRoot(current->blkno);

	leafdata = leafptr = (char *) palloc(totalLeafSizes);

	/* Here we begin making the changes to the target pages */
	START_CRIT_SECTION();

	/*
	 * Delete old leaf tuples from current buffer, except when we're splitting
	 * the root; in that case there's no need because we'll re-init the page
	 * below.  We do this first to make room for reinserting new leaf tuples.
	 */
	if (!SpGistBlockIsRoot(current->blkno))
	{
		/*
		 * Init buffer instead of deleting individual tuples, but only if
		 * there aren't any other live tuples and only during build; otherwise
		 * we need to set a redirection tuple for concurrent scans.
		 */
		if (state->isBuild &&
			nToDelete + SpGistPageGetOpaque(current->page)->nPlaceholder ==
			PageGetMaxOffsetNumber(current->page))
		{
			SpGistInitBuffer(current->buffer,
							 SPGIST_LEAF | (isNulls ? SPGIST_NULLS : 0));
			xlrec.initSrc = true;
		}
		else if (isNew)
		{
			/* don't expose the freshly init'd buffer as a backup block */
			Assert(nToDelete == 0);
		}
		else
		{
			xlrec.nDelete = nToDelete;

			if (!state->isBuild)
			{
				/*
				 * Need to create redirect tuple (it will point to new inner
				 * tuple) but right now the new tuple's location is not known
				 * yet.  So, set the redirection pointer to "impossible" value
				 * and remember its position to update tuple later.
				 */
				if (nToDelete > 0)
					redirectTuplePos = toDelete[0];
				spgPageIndexMultiDelete(state, current->page,
										toDelete, nToDelete,
										SPGIST_REDIRECT,
										SPGIST_PLACEHOLDER,
										SPGIST_METAPAGE_BLKNO,
										FirstOffsetNumber);
			}
			else
			{
				/*
				 * During index build there is not concurrent searches, so we
				 * don't need to create redirection tuple.
				 */
				spgPageIndexMultiDelete(state, current->page,
										toDelete, nToDelete,
										SPGIST_PLACEHOLDER,
										SPGIST_PLACEHOLDER,
										InvalidBlockNumber,
										InvalidOffsetNumber);
			}
		}
	}

	/*
	 * Put leaf tuples on proper pages, and update downlinks in innerTuple's
	 * nodes.
	 */
	startOffsets[0] = startOffsets[1] = InvalidOffsetNumber;
	for (i = 0; i < nToInsert; i++)
	{
		SpGistLeafTuple it = newLeafs[i];
		Buffer		leafBuffer;
		BlockNumber leafBlock;
		OffsetNumber newoffset;

		/* Which page is it going to? */
		leafBuffer = leafPageSelect[i] ? newLeafBuffer : current->buffer;
		leafBlock = BufferGetBlockNumber(leafBuffer);

		/* Link tuple into correct chain for its node */
		n = out.mapTuplesToNodes[i];

		if (ItemPointerIsValid(&nodes[n]->t_tid))
		{
			Assert(ItemPointerGetBlockNumber(&nodes[n]->t_tid) == leafBlock);
			it->nextOffset = ItemPointerGetOffsetNumber(&nodes[n]->t_tid);
		}
		else
			it->nextOffset = InvalidOffsetNumber;

		/* Insert it on page */
		newoffset = SpGistPageAddNewItem(state, BufferGetPage(leafBuffer),
										 (Item) it, it->size,
										 &startOffsets[leafPageSelect[i]],
										 false);
		toInsert[i] = newoffset;

		/* ... and complete the chain linking */
		ItemPointerSet(&nodes[n]->t_tid, leafBlock, newoffset);

		/* Also copy leaf tuple into WAL data */
		memcpy(leafptr, newLeafs[i], newLeafs[i]->size);
		leafptr += newLeafs[i]->size;
	}

	/*
	 * We're done modifying the other leaf buffer (if any), so mark it dirty.
	 * current->buffer will be marked below, after we're entirely done
	 * modifying it.
	 */
	if (newLeafBuffer != InvalidBuffer)
	{
		MarkBufferDirty(newLeafBuffer);
	}

	/* Remember current buffer, since we're about to change "current" */
	saveCurrent = *current;

	/*
	 * Store the new innerTuple
	 */
	if (newInnerBuffer == parent->buffer && newInnerBuffer != InvalidBuffer)
	{
		/*
		 * new inner tuple goes to parent page
		 */
		Assert(current->buffer != parent->buffer);

		/* Repoint "current" at the new inner tuple */
		current->blkno = parent->blkno;
		current->buffer = parent->buffer;
		current->page = parent->page;
		xlrec.offnumInner = current->offnum =
			SpGistPageAddNewItem(state, current->page,
								 (Item) innerTuple, innerTuple->size,
								 NULL, false);

		/*
		 * Update parent node link and mark parent page dirty
		 */
		xlrec.innerIsParent = true;
		xlrec.offnumParent = parent->offnum;
		xlrec.nodeI = parent->node;
		saveNodeLink(index, parent, current->blkno, current->offnum);

		/*
		 * Update redirection link (in old current buffer)
		 */
		if (redirectTuplePos != InvalidOffsetNumber)
			setRedirectionTuple(&saveCurrent, redirectTuplePos,
								current->blkno, current->offnum);

		/* Done modifying old current buffer, mark it dirty */
		MarkBufferDirty(saveCurrent.buffer);
	}
	else if (parent->buffer != InvalidBuffer)
	{
		/*
		 * new inner tuple will be stored on a new page
		 */
		Assert(newInnerBuffer != InvalidBuffer);

		/* Repoint "current" at the new inner tuple */
		current->buffer = newInnerBuffer;
		current->blkno = BufferGetBlockNumber(current->buffer);
		current->page = BufferGetPage(current->buffer);
		xlrec.offnumInner = current->offnum =
			SpGistPageAddNewItem(state, current->page,
								 (Item) innerTuple, innerTuple->size,
								 NULL, false);

		/* Done modifying new current buffer, mark it dirty */
		MarkBufferDirty(current->buffer);

		/*
		 * Update parent node link and mark parent page dirty
		 */
		xlrec.innerIsParent = (parent->buffer == current->buffer);
		xlrec.offnumParent = parent->offnum;
		xlrec.nodeI = parent->node;
		saveNodeLink(index, parent, current->blkno, current->offnum);

		/*
		 * Update redirection link (in old current buffer)
		 */
		if (redirectTuplePos != InvalidOffsetNumber)
			setRedirectionTuple(&saveCurrent, redirectTuplePos,
								current->blkno, current->offnum);

		/* Done modifying old current buffer, mark it dirty */
		MarkBufferDirty(saveCurrent.buffer);
	}
	else
	{
		/*
		 * Splitting root page, which was a leaf but now becomes inner page
		 * (and so "current" continues to point at it)
		 */
		Assert(SpGistBlockIsRoot(current->blkno));
		Assert(redirectTuplePos == InvalidOffsetNumber);

		SpGistInitBuffer(current->buffer, (isNulls ? SPGIST_NULLS : 0));
		xlrec.initInner = true;
		xlrec.innerIsParent = false;

		xlrec.offnumInner = current->offnum =
			PageAddItem(current->page, (Item) innerTuple, innerTuple->size,
						InvalidOffsetNumber, false, false);
		if (current->offnum != FirstOffsetNumber)
			elog(ERROR, "failed to add item of size %u to SPGiST index page",
				 innerTuple->size);

		/* No parent link to update, nor redirection to do */
		xlrec.offnumParent = InvalidOffsetNumber;
		xlrec.nodeI = 0;

		/* Done modifying new current buffer, mark it dirty */
		MarkBufferDirty(current->buffer);

		/* saveCurrent doesn't represent a different buffer */
		saveCurrent.buffer = InvalidBuffer;
	}

	if (RelationNeedsWAL(index) && !state->isBuild)
	{
		XLogRecPtr	recptr;
		int			flags;

		XLogBeginInsert();

		xlrec.nInsert = nToInsert;
		XLogRegisterData((char *) &xlrec, SizeOfSpgxlogPickSplit);

		XLogRegisterData((char *) toDelete,
						 sizeof(OffsetNumber) * xlrec.nDelete);
		XLogRegisterData((char *) toInsert,
						 sizeof(OffsetNumber) * xlrec.nInsert);
		XLogRegisterData((char *) leafPageSelect,
						 sizeof(uint8) * xlrec.nInsert);
		XLogRegisterData((char *) innerTuple, innerTuple->size);
		XLogRegisterData(leafdata, leafptr - leafdata);

		/* Old leaf page */
		if (BufferIsValid(saveCurrent.buffer))
		{
			flags = REGBUF_STANDARD;
			if (xlrec.initSrc)
				flags |= REGBUF_WILL_INIT;
			XLogRegisterBuffer(0, saveCurrent.buffer, flags);
		}

		/* New leaf page */
		if (BufferIsValid(newLeafBuffer))
		{
			flags = REGBUF_STANDARD;
			if (xlrec.initDest)
				flags |= REGBUF_WILL_INIT;
			XLogRegisterBuffer(1, newLeafBuffer, flags);
		}

		/* Inner page */
		flags = REGBUF_STANDARD;
		if (xlrec.initInner)
			flags |= REGBUF_WILL_INIT;
		XLogRegisterBuffer(2, current->buffer, flags);

		/* Parent page, if different from inner page */
		if (parent->buffer != InvalidBuffer)
		{
			if (parent->buffer != current->buffer)
				XLogRegisterBuffer(3, parent->buffer, REGBUF_STANDARD);
			else
				Assert(xlrec.innerIsParent);
		}

		/* Issue the WAL record */
		recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_PICKSPLIT);

		/* Update page LSNs on all affected pages */
		if (newLeafBuffer != InvalidBuffer)
		{
			Page		page = BufferGetPage(newLeafBuffer);

			PageSetLSN(page, recptr);
		}

		if (saveCurrent.buffer != InvalidBuffer)
		{
			Page		page = BufferGetPage(saveCurrent.buffer);

			PageSetLSN(page, recptr);
		}

		PageSetLSN(current->page, recptr);

		if (parent->buffer != InvalidBuffer)
		{
			PageSetLSN(parent->page, recptr);
		}
	}

	END_CRIT_SECTION();

	/* Update local free-space cache and unlock buffers */
	if (newLeafBuffer != InvalidBuffer)
	{
		SpGistSetLastUsedPage(index, newLeafBuffer);
		UnlockReleaseBuffer(newLeafBuffer);
	}
	if (saveCurrent.buffer != InvalidBuffer)
	{
		SpGistSetLastUsedPage(index, saveCurrent.buffer);
		UnlockReleaseBuffer(saveCurrent.buffer);
	}

	return insertedNew;
}

/*
 * spgMatchNode action: descend to N'th child node of current inner tuple
 */
static void
spgMatchNodeAction(Relation index, SpGistState *state,
				   SpGistInnerTuple innerTuple,
				   SPPageDesc *current, SPPageDesc *parent, int nodeN)
{
	int			i;
	SpGistNodeTuple node;

	/* Release previous parent buffer if any */
	if (parent->buffer != InvalidBuffer &&
		parent->buffer != current->buffer)
	{
		SpGistSetLastUsedPage(index, parent->buffer);
		UnlockReleaseBuffer(parent->buffer);
	}

	/* Repoint parent to specified node of current inner tuple */
	parent->blkno = current->blkno;
	parent->buffer = current->buffer;
	parent->page = current->page;
	parent->offnum = current->offnum;
	parent->node = nodeN;

	/* Locate that node */
	SGITITERATE(innerTuple, i, node)
	{
		if (i == nodeN)
			break;
	}

	if (i != nodeN)
		elog(ERROR, "failed to find requested node %d in SPGiST inner tuple",
			 nodeN);

	/* Point current to the downlink location, if any */
	if (ItemPointerIsValid(&node->t_tid))
	{
		current->blkno = ItemPointerGetBlockNumber(&node->t_tid);
		current->offnum = ItemPointerGetOffsetNumber(&node->t_tid);
	}
	else
	{
		/* Downlink is empty, so we'll need to find a new page */
		current->blkno = InvalidBlockNumber;
		current->offnum = InvalidOffsetNumber;
	}

	current->buffer = InvalidBuffer;
	current->page = NULL;
}

/*
 * spgAddNode action: add a node to the inner tuple at current
 */
static void
spgAddNodeAction(Relation index, SpGistState *state,
				 SpGistInnerTuple innerTuple,
				 SPPageDesc *current, SPPageDesc *parent,
				 int nodeN, Datum nodeLabel)
{
	SpGistInnerTuple newInnerTuple;
	spgxlogAddNode xlrec;

	/* Should not be applied to nulls */
	Assert(!SpGistPageStoresNulls(current->page));

	/* Construct new inner tuple with additional node */
	newInnerTuple = addNode(state, innerTuple, nodeLabel, nodeN);

	/* Prepare WAL record */
	STORE_STATE(state, xlrec.stateSrc);
	xlrec.offnum = current->offnum;

	/* we don't fill these unless we need to change the parent downlink */
	xlrec.parentBlk = -1;
	xlrec.offnumParent = InvalidOffsetNumber;
	xlrec.nodeI = 0;

	/* we don't fill these unless tuple has to be moved */
	xlrec.offnumNew = InvalidOffsetNumber;
	xlrec.newPage = false;

	if (PageGetExactFreeSpace(current->page) >=
		newInnerTuple->size - innerTuple->size)
	{
		/*
		 * We can replace the inner tuple by new version in-place
		 */
		START_CRIT_SECTION();

		PageIndexTupleDelete(current->page, current->offnum);
		if (PageAddItem(current->page,
						(Item) newInnerTuple, newInnerTuple->size,
						current->offnum, false, false) != current->offnum)
			elog(ERROR, "failed to add item of size %u to SPGiST index page",
				 newInnerTuple->size);

		MarkBufferDirty(current->buffer);

		if (RelationNeedsWAL(index) && !state->isBuild)
		{
			XLogRecPtr	recptr;

			XLogBeginInsert();
			XLogRegisterData((char *) &xlrec, sizeof(xlrec));
			XLogRegisterData((char *) newInnerTuple, newInnerTuple->size);

			XLogRegisterBuffer(0, current->buffer, REGBUF_STANDARD);

			recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_NODE);

			PageSetLSN(current->page, recptr);
		}

		END_CRIT_SECTION();
	}
	else
	{
		/*
		 * move inner tuple to another page, and update parent
		 */
		SpGistDeadTuple dt;
		SPPageDesc	saveCurrent;

		/*
		 * It should not be possible to get here for the root page, since we
		 * allow only one inner tuple on the root page, and spgFormInnerTuple
		 * always checks that inner tuples don't exceed the size of a page.
		 */
		if (SpGistBlockIsRoot(current->blkno))
			elog(ERROR, "cannot enlarge root tuple any more");
		Assert(parent->buffer != InvalidBuffer);

		saveCurrent = *current;

		xlrec.offnumParent = parent->offnum;
		xlrec.nodeI = parent->node;

		/*
		 * obtain new buffer with the same parity as current, since it will be
		 * a child of same parent tuple
		 */
		current->buffer = SpGistGetBuffer(index,
										  GBUF_INNER_PARITY(current->blkno),
										  newInnerTuple->size + sizeof(ItemIdData),
										  &xlrec.newPage);
		current->blkno = BufferGetBlockNumber(current->buffer);
		current->page = BufferGetPage(current->buffer);

		/*
		 * Let's just make real sure new current isn't same as old.  Right now
		 * that's impossible, but if SpGistGetBuffer ever got smart enough to
		 * delete placeholder tuples before checking space, maybe it wouldn't
		 * be impossible.  The case would appear to work except that WAL
		 * replay would be subtly wrong, so I think a mere assert isn't enough
		 * here.
		 */
		if (current->blkno == saveCurrent.blkno)
			elog(ERROR, "SPGiST new buffer shouldn't be same as old buffer");

		/*
		 * New current and parent buffer will both be modified; but note that
		 * parent buffer could be same as either new or old current.
		 */
		if (parent->buffer == saveCurrent.buffer)
			xlrec.parentBlk = 0;
		else if (parent->buffer == current->buffer)
			xlrec.parentBlk = 1;
		else
			xlrec.parentBlk = 2;

		START_CRIT_SECTION();

		/* insert new ... */
		xlrec.offnumNew = current->offnum =
			SpGistPageAddNewItem(state, current->page,
								 (Item) newInnerTuple, newInnerTuple->size,
								 NULL, false);

		MarkBufferDirty(current->buffer);

		/* update parent's downlink and mark parent page dirty */
		saveNodeLink(index, parent, current->blkno, current->offnum);

		/*
		 * Replace old tuple with a placeholder or redirection tuple.  Unless
		 * doing an index build, we have to insert a redirection tuple for
		 * possible concurrent scans.  We can't just delete it in any case,
		 * because that could change the offsets of other tuples on the page,
		 * breaking downlinks from their parents.
		 */
		if (state->isBuild)
			dt = spgFormDeadTuple(state, SPGIST_PLACEHOLDER,
								  InvalidBlockNumber, InvalidOffsetNumber);
		else
			dt = spgFormDeadTuple(state, SPGIST_REDIRECT,
								  current->blkno, current->offnum);

		PageIndexTupleDelete(saveCurrent.page, saveCurrent.offnum);
		if (PageAddItem(saveCurrent.page, (Item) dt, dt->size,
						saveCurrent.offnum,
						false, false) != saveCurrent.offnum)
			elog(ERROR, "failed to add item of size %u to SPGiST index page",
				 dt->size);

		if (state->isBuild)
			SpGistPageGetOpaque(saveCurrent.page)->nPlaceholder++;
		else
			SpGistPageGetOpaque(saveCurrent.page)->nRedirection++;

		MarkBufferDirty(saveCurrent.buffer);

		if (RelationNeedsWAL(index) && !state->isBuild)
		{
			XLogRecPtr	recptr;
			int			flags;

			XLogBeginInsert();

			/* orig page */
			XLogRegisterBuffer(0, saveCurrent.buffer, REGBUF_STANDARD);
			/* new page */
			flags = REGBUF_STANDARD;
			if (xlrec.newPage)
				flags |= REGBUF_WILL_INIT;
			XLogRegisterBuffer(1, current->buffer, flags);
			/* parent page (if different from orig and new) */
			if (xlrec.parentBlk == 2)
				XLogRegisterBuffer(2, parent->buffer, REGBUF_STANDARD);

			XLogRegisterData((char *) &xlrec, sizeof(xlrec));
			XLogRegisterData((char *) newInnerTuple, newInnerTuple->size);

			recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_NODE);

			/* we don't bother to check if any of these are redundant */
			PageSetLSN(current->page, recptr);
			PageSetLSN(parent->page, recptr);
			PageSetLSN(saveCurrent.page, recptr);
		}

		END_CRIT_SECTION();

		/* Release saveCurrent if it's not same as current or parent */
		if (saveCurrent.buffer != current->buffer &&
			saveCurrent.buffer != parent->buffer)
		{
			SpGistSetLastUsedPage(index, saveCurrent.buffer);
			UnlockReleaseBuffer(saveCurrent.buffer);
		}
	}
}

/*
 * spgSplitNode action: split inner tuple at current into prefix and postfix
 */
static void
spgSplitNodeAction(Relation index, SpGistState *state,
				   SpGistInnerTuple innerTuple,
				   SPPageDesc *current, spgChooseOut *out)
{
	SpGistInnerTuple prefixTuple,
				postfixTuple;
	SpGistNodeTuple node,
			   *nodes;
	BlockNumber postfixBlkno;
	OffsetNumber postfixOffset;
	int			i;
	spgxlogSplitTuple xlrec;
	Buffer		newBuffer = InvalidBuffer;

	/* Should not be applied to nulls */
	Assert(!SpGistPageStoresNulls(current->page));

	/* Check opclass gave us sane values */
	if (out->result.splitTuple.prefixNNodes <= 0 ||
		out->result.splitTuple.prefixNNodes > SGITMAXNNODES)
		elog(ERROR, "invalid number of prefix nodes: %d",
			 out->result.splitTuple.prefixNNodes);
	if (out->result.splitTuple.childNodeN < 0 ||
		out->result.splitTuple.childNodeN >=
		out->result.splitTuple.prefixNNodes)
		elog(ERROR, "invalid child node number: %d",
			 out->result.splitTuple.childNodeN);

	/*
	 * Construct new prefix tuple with requested number of nodes.  We'll fill
	 * in the childNodeN'th node's downlink below.
	 */
	nodes = (SpGistNodeTuple *) palloc(sizeof(SpGistNodeTuple) *
									   out->result.splitTuple.prefixNNodes);

	for (i = 0; i < out->result.splitTuple.prefixNNodes; i++)
	{
		Datum		label = (Datum) 0;
		bool		labelisnull;

		labelisnull = (out->result.splitTuple.prefixNodeLabels == NULL);
		if (!labelisnull)
			label = out->result.splitTuple.prefixNodeLabels[i];
		nodes[i] = spgFormNodeTuple(state, label, labelisnull);
	}

	prefixTuple = spgFormInnerTuple(state,
									out->result.splitTuple.prefixHasPrefix,
									out->result.splitTuple.prefixPrefixDatum,
									out->result.splitTuple.prefixNNodes,
									nodes);

	/* it must fit in the space that innerTuple now occupies */
	if (prefixTuple->size > innerTuple->size)
		elog(ERROR, "SPGiST inner-tuple split must not produce longer prefix");

	/*
	 * Construct new postfix tuple, containing all nodes of innerTuple with
	 * same node datums, but with the prefix specified by the picksplit
	 * function.
	 */
	nodes = palloc(sizeof(SpGistNodeTuple) * innerTuple->nNodes);
	SGITITERATE(innerTuple, i, node)
	{
		nodes[i] = node;
	}

	postfixTuple = spgFormInnerTuple(state,
									 out->result.splitTuple.postfixHasPrefix,
									 out->result.splitTuple.postfixPrefixDatum,
									 innerTuple->nNodes, nodes);

	/* Postfix tuple is allTheSame if original tuple was */
	postfixTuple->allTheSame = innerTuple->allTheSame;

	/* prep data for WAL record */
	xlrec.newPage = false;

	/*
	 * If we can't fit both tuples on the current page, get a new page for the
	 * postfix tuple.  In particular, can't split to the root page.
	 *
	 * For the space calculation, note that prefixTuple replaces innerTuple
	 * but postfixTuple will be a new entry.
	 */
	if (SpGistBlockIsRoot(current->blkno) ||
		SpGistPageGetFreeSpace(current->page, 1) + innerTuple->size <
		prefixTuple->size + postfixTuple->size + sizeof(ItemIdData))
	{
		/*
		 * Choose page with next triple parity, because postfix tuple is a
		 * child of prefix one
		 */
		newBuffer = SpGistGetBuffer(index,
									GBUF_INNER_PARITY(current->blkno + 1),
									postfixTuple->size + sizeof(ItemIdData),
									&xlrec.newPage);
	}

	START_CRIT_SECTION();

	/*
	 * Replace old tuple by prefix tuple
	 */
	PageIndexTupleDelete(current->page, current->offnum);
	xlrec.offnumPrefix = PageAddItem(current->page,
									 (Item) prefixTuple, prefixTuple->size,
									 current->offnum, false, false);
	if (xlrec.offnumPrefix != current->offnum)
		elog(ERROR, "failed to add item of size %u to SPGiST index page",
			 prefixTuple->size);

	/*
	 * put postfix tuple into appropriate page
	 */
	if (newBuffer == InvalidBuffer)
	{
		postfixBlkno = current->blkno;
		xlrec.offnumPostfix = postfixOffset =
			SpGistPageAddNewItem(state, current->page,
								 (Item) postfixTuple, postfixTuple->size,
								 NULL, false);
		xlrec.postfixBlkSame = true;
	}
	else
	{
		postfixBlkno = BufferGetBlockNumber(newBuffer);
		xlrec.offnumPostfix = postfixOffset =
			SpGistPageAddNewItem(state, BufferGetPage(newBuffer),
								 (Item) postfixTuple, postfixTuple->size,
								 NULL, false);
		MarkBufferDirty(newBuffer);
		xlrec.postfixBlkSame = false;
	}

	/*
	 * And set downlink pointer in the prefix tuple to point to postfix tuple.
	 * (We can't avoid this step by doing the above two steps in opposite
	 * order, because there might not be enough space on the page to insert
	 * the postfix tuple first.)  We have to update the local copy of the
	 * prefixTuple too, because that's what will be written to WAL.
	 */
	spgUpdateNodeLink(prefixTuple, out->result.splitTuple.childNodeN,
					  postfixBlkno, postfixOffset);
	prefixTuple = (SpGistInnerTuple) PageGetItem(current->page,
												 PageGetItemId(current->page, current->offnum));
	spgUpdateNodeLink(prefixTuple, out->result.splitTuple.childNodeN,
					  postfixBlkno, postfixOffset);

	MarkBufferDirty(current->buffer);

	if (RelationNeedsWAL(index) && !state->isBuild)
	{
		XLogRecPtr	recptr;

		XLogBeginInsert();
		XLogRegisterData((char *) &xlrec, sizeof(xlrec));
		XLogRegisterData((char *) prefixTuple, prefixTuple->size);
		XLogRegisterData((char *) postfixTuple, postfixTuple->size);

		XLogRegisterBuffer(0, current->buffer, REGBUF_STANDARD);
		if (newBuffer != InvalidBuffer)
		{
			int			flags;

			flags = REGBUF_STANDARD;
			if (xlrec.newPage)
				flags |= REGBUF_WILL_INIT;
			XLogRegisterBuffer(1, newBuffer, flags);
		}

		recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_SPLIT_TUPLE);

		PageSetLSN(current->page, recptr);

		if (newBuffer != InvalidBuffer)
		{
			PageSetLSN(BufferGetPage(newBuffer), recptr);
		}
	}

	END_CRIT_SECTION();

	/* Update local free-space cache and release buffer */
	if (newBuffer != InvalidBuffer)
	{
		SpGistSetLastUsedPage(index, newBuffer);
		UnlockReleaseBuffer(newBuffer);
	}
}

/*
 * Insert one item into the index.
 *
 * Returns true on success, false if we failed to complete the insertion
 * because of conflict with a concurrent insert.  In the latter case,
 * caller should re-call spgdoinsert() with the same args.
 */
bool
spgdoinsert(Relation index, SpGistState *state,
			ItemPointer heapPtr, Datum datum, bool isnull)
{
	int			level = 0;
	Datum		leafDatum;
	int			leafSize;
	SPPageDesc	current,
				parent;
	FmgrInfo   *procinfo = NULL;

	/*
	 * Look up FmgrInfo of the user-defined choose function once, to save
	 * cycles in the loop below.
	 */
	if (!isnull)
		procinfo = index_getprocinfo(index, 1, SPGIST_CHOOSE_PROC);

	/*
	 * Prepare the leaf datum to insert.
	 *
	 * If an optional "compress" method is provided, then call it to form the
	 * leaf datum from the input datum.  Otherwise store the input datum as
	 * is.  Since we don't use index_form_tuple in this AM, we have to make
	 * sure value to be inserted is not toasted; FormIndexDatum doesn't
	 * guarantee that.  But we assume the "compress" method to return an
	 * untoasted value.
	 */
	if (!isnull)
	{
		if (OidIsValid(index_getprocid(index, 1, SPGIST_COMPRESS_PROC)))
		{
			FmgrInfo   *compressProcinfo = NULL;

			compressProcinfo = index_getprocinfo(index, 1, SPGIST_COMPRESS_PROC);
			leafDatum = FunctionCall1Coll(compressProcinfo,
										  index->rd_indcollation[0],
										  datum);
		}
		else
		{
			Assert(state->attLeafType.type == state->attType.type);

			if (state->attType.attlen == -1)
				leafDatum = PointerGetDatum(PG_DETOAST_DATUM(datum));
			else
				leafDatum = datum;
		}
	}
	else
		leafDatum = (Datum) 0;

	/*
	 * Compute space needed for a leaf tuple containing the given datum.
	 *
	 * If it isn't gonna fit, and the opclass can't reduce the datum size by
	 * suffixing, bail out now rather than getting into an endless loop.
	 */
	if (!isnull)
		leafSize = SGLTHDRSZ + sizeof(ItemIdData) +
			SpGistGetTypeSize(&state->attLeafType, leafDatum);
	else
		leafSize = SGDTSIZE + sizeof(ItemIdData);

	if (leafSize > SPGIST_PAGE_CAPACITY && !state->config.longValuesOK)
		ereport(ERROR,
				(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
				 errmsg("index row size %zu exceeds maximum %zu for index \"%s\"",
						leafSize - sizeof(ItemIdData),
						SPGIST_PAGE_CAPACITY - sizeof(ItemIdData),
						RelationGetRelationName(index)),
				 errhint("Values larger than a buffer page cannot be indexed.")));

	/* Initialize "current" to the appropriate root page */
	current.blkno = isnull ? SPGIST_NULL_BLKNO : SPGIST_ROOT_BLKNO;
	current.buffer = InvalidBuffer;
	current.page = NULL;
	current.offnum = FirstOffsetNumber;
	current.node = -1;

	/* "parent" is invalid for the moment */
	parent.blkno = InvalidBlockNumber;
	parent.buffer = InvalidBuffer;
	parent.page = NULL;
	parent.offnum = InvalidOffsetNumber;
	parent.node = -1;

	for (;;)
	{
		bool		isNew = false;

		/*
		 * Bail out if query cancel is pending.  We must have this somewhere
		 * in the loop since a broken opclass could produce an infinite
		 * picksplit loop.
		 */
		CHECK_FOR_INTERRUPTS();

		if (current.blkno == InvalidBlockNumber)
		{
			/*
			 * Create a leaf page.  If leafSize is too large to fit on a page,
			 * we won't actually use the page yet, but it simplifies the API
			 * for doPickSplit to always have a leaf page at hand; so just
			 * quietly limit our request to a page size.
			 */
			current.buffer =
				SpGistGetBuffer(index,
								GBUF_LEAF | (isnull ? GBUF_NULLS : 0),
								Min(leafSize, SPGIST_PAGE_CAPACITY),
								&isNew);
			current.blkno = BufferGetBlockNumber(current.buffer);
		}
		else if (parent.buffer == InvalidBuffer)
		{
			/* we hold no parent-page lock, so no deadlock is possible */
			current.buffer = ReadBuffer(index, current.blkno);
			LockBuffer(current.buffer, BUFFER_LOCK_EXCLUSIVE);
		}
		else if (current.blkno != parent.blkno)
		{
			/* descend to a new child page */
			current.buffer = ReadBuffer(index, current.blkno);

			/*
			 * Attempt to acquire lock on child page.  We must beware of
			 * deadlock against another insertion process descending from that
			 * page to our parent page (see README).  If we fail to get lock,
			 * abandon the insertion and tell our caller to start over.
			 *
			 * XXX this could be improved, because failing to get lock on a
			 * buffer is not proof of a deadlock situation; the lock might be
			 * held by a reader, or even just background writer/checkpointer
			 * process.  Perhaps it'd be worth retrying after sleeping a bit?
			 */
			if (!ConditionalLockBuffer(current.buffer))
			{
				ReleaseBuffer(current.buffer);
				UnlockReleaseBuffer(parent.buffer);
				return false;
			}
		}
		else
		{
			/* inner tuple can be stored on the same page as parent one */
			current.buffer = parent.buffer;
		}
		current.page = BufferGetPage(current.buffer);

		/* should not arrive at a page of the wrong type */
		if (isnull ? !SpGistPageStoresNulls(current.page) :
			SpGistPageStoresNulls(current.page))
			elog(ERROR, "SPGiST index page %u has wrong nulls flag",
				 current.blkno);

		if (SpGistPageIsLeaf(current.page))
		{
			SpGistLeafTuple leafTuple;
			int			nToSplit,
						sizeToSplit;

			leafTuple = spgFormLeafTuple(state, heapPtr, leafDatum, isnull);
			if (leafTuple->size + sizeof(ItemIdData) <=
				SpGistPageGetFreeSpace(current.page, 1))
			{
				/* it fits on page, so insert it and we're done */
				addLeafTuple(index, state, leafTuple,
							 &current, &parent, isnull, isNew);
				break;
			}
			else if ((sizeToSplit =
					  checkSplitConditions(index, state, &current,
										   &nToSplit)) < SPGIST_PAGE_CAPACITY / 2 &&
					 nToSplit < 64 &&
					 leafTuple->size + sizeof(ItemIdData) + sizeToSplit <= SPGIST_PAGE_CAPACITY)
			{
				/*
				 * the amount of data is pretty small, so just move the whole
				 * chain to another leaf page rather than splitting it.
				 */
				Assert(!isNew);
				moveLeafs(index, state, &current, &parent, leafTuple, isnull);
				break;			/* we're done */
			}
			else
			{
				/* picksplit */
				if (doPickSplit(index, state, &current, &parent,
								leafTuple, level, isnull, isNew))
					break;		/* doPickSplit installed new tuples */

				/* leaf tuple will not be inserted yet */
				pfree(leafTuple);

				/*
				 * current now describes new inner tuple, go insert into it
				 */
				Assert(!SpGistPageIsLeaf(current.page));
				goto process_inner_tuple;
			}
		}
		else					/* non-leaf page */
		{
			/*
			 * Apply the opclass choose function to figure out how to insert
			 * the given datum into the current inner tuple.
			 */
			SpGistInnerTuple innerTuple;
			spgChooseIn in;
			spgChooseOut out;

			/*
			 * spgAddNode and spgSplitTuple cases will loop back to here to
			 * complete the insertion operation.  Just in case the choose
			 * function is broken and produces add or split requests
			 * repeatedly, check for query cancel.
			 */
	process_inner_tuple:
			CHECK_FOR_INTERRUPTS();

			innerTuple = (SpGistInnerTuple) PageGetItem(current.page,
														PageGetItemId(current.page, current.offnum));

			in.datum = datum;
			in.leafDatum = leafDatum;
			in.level = level;
			in.allTheSame = innerTuple->allTheSame;
			in.hasPrefix = (innerTuple->prefixSize > 0);
			in.prefixDatum = SGITDATUM(innerTuple, state);
			in.nNodes = innerTuple->nNodes;
			in.nodeLabels = spgExtractNodeLabels(state, innerTuple);

			memset(&out, 0, sizeof(out));

			if (!isnull)
			{
				/* use user-defined choose method */
				FunctionCall2Coll(procinfo,
								  index->rd_indcollation[0],
								  PointerGetDatum(&in),
								  PointerGetDatum(&out));
			}
			else
			{
				/* force "match" action (to insert to random subnode) */
				out.resultType = spgMatchNode;
			}

			if (innerTuple->allTheSame)
			{
				/*
				 * It's not allowed to do an AddNode at an allTheSame tuple.
				 * Opclass must say "match", in which case we choose a random
				 * one of the nodes to descend into, or "split".
				 */
				if (out.resultType == spgAddNode)
					elog(ERROR, "cannot add a node to an allTheSame inner tuple");
				else if (out.resultType == spgMatchNode)
					out.result.matchNode.nodeN = random() % innerTuple->nNodes;
			}

			switch (out.resultType)
			{
				case spgMatchNode:
					/* Descend to N'th child node */
					spgMatchNodeAction(index, state, innerTuple,
									   &current, &parent,
									   out.result.matchNode.nodeN);
					/* Adjust level as per opclass request */
					level += out.result.matchNode.levelAdd;
					/* Replace leafDatum and recompute leafSize */
					if (!isnull)
					{
						leafDatum = out.result.matchNode.restDatum;
						leafSize = SGLTHDRSZ + sizeof(ItemIdData) +
							SpGistGetTypeSize(&state->attLeafType, leafDatum);
					}

					/*
					 * Loop around and attempt to insert the new leafDatum at
					 * "current" (which might reference an existing child
					 * tuple, or might be invalid to force us to find a new
					 * page for the tuple).
					 *
					 * Note: if the opclass sets longValuesOK, we rely on the
					 * choose function to eventually shorten the leafDatum
					 * enough to fit on a page.  We could add a test here to
					 * complain if the datum doesn't get visibly shorter each
					 * time, but that could get in the way of opclasses that
					 * "simplify" datums in a way that doesn't necessarily
					 * lead to physical shortening on every cycle.
					 */
					break;
				case spgAddNode:
					/* AddNode is not sensible if nodes don't have labels */
					if (in.nodeLabels == NULL)
						elog(ERROR, "cannot add a node to an inner tuple without node labels");
					/* Add node to inner tuple, per request */
					spgAddNodeAction(index, state, innerTuple,
									 &current, &parent,
									 out.result.addNode.nodeN,
									 out.result.addNode.nodeLabel);

					/*
					 * Retry insertion into the enlarged node.  We assume that
					 * we'll get a MatchNode result this time.
					 */
					goto process_inner_tuple;
					break;
				case spgSplitTuple:
					/* Split inner tuple, per request */
					spgSplitNodeAction(index, state, innerTuple,
									   &current, &out);

					/* Retry insertion into the split node */
					goto process_inner_tuple;
					break;
				default:
					elog(ERROR, "unrecognized SPGiST choose result: %d",
						 (int) out.resultType);
					break;
			}
		}
	}							/* end loop */

	/*
	 * Release any buffers we're still holding.  Beware of possibility that
	 * current and parent reference same buffer.
	 */
	if (current.buffer != InvalidBuffer)
	{
		SpGistSetLastUsedPage(index, current.buffer);
		UnlockReleaseBuffer(current.buffer);
	}
	if (parent.buffer != InvalidBuffer &&
		parent.buffer != current.buffer)
	{
		SpGistSetLastUsedPage(index, parent.buffer);
		UnlockReleaseBuffer(parent.buffer);
	}

	return true;
}

相关信息

greenplumn 源码目录

相关文章

greenplumn spginsert 源码

greenplumn spgkdtreeproc 源码

greenplumn spgproc 源码

greenplumn spgquadtreeproc 源码

greenplumn spgscan 源码

greenplumn spgtextproc 源码

greenplumn spgutils 源码

greenplumn spgvacuum 源码

greenplumn spgvalidate 源码

greenplumn spgxlog 源码

0  赞