greenplumn spgdoinsert 源码
greenplumn spgdoinsert 代码
文件路径:/src/backend/access/spgist/spgdoinsert.c
/*-------------------------------------------------------------------------
*
* spgdoinsert.c
* implementation of insert algorithm
*
*
* Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* src/backend/access/spgist/spgdoinsert.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/genam.h"
#include "access/spgist_private.h"
#include "access/spgxlog.h"
#include "access/xloginsert.h"
#include "miscadmin.h"
#include "storage/bufmgr.h"
#include "utils/rel.h"
/*
* SPPageDesc tracks all info about a page we are inserting into. In some
* situations it actually identifies a tuple, or even a specific node within
* an inner tuple. But any of the fields can be invalid. If the buffer
* field is valid, it implies we hold pin and exclusive lock on that buffer.
* page pointer should be valid exactly when buffer is.
*/
typedef struct SPPageDesc
{
BlockNumber blkno; /* block number, or InvalidBlockNumber */
Buffer buffer; /* page's buffer number, or InvalidBuffer */
Page page; /* pointer to page buffer, or NULL */
OffsetNumber offnum; /* offset of tuple, or InvalidOffsetNumber */
int node; /* node number within inner tuple, or -1 */
} SPPageDesc;
/*
* Set the item pointer in the nodeN'th entry in inner tuple tup. This
* is used to update the parent inner tuple's downlink after a move or
* split operation.
*/
void
spgUpdateNodeLink(SpGistInnerTuple tup, int nodeN,
BlockNumber blkno, OffsetNumber offset)
{
int i;
SpGistNodeTuple node;
SGITITERATE(tup, i, node)
{
if (i == nodeN)
{
ItemPointerSet(&node->t_tid, blkno, offset);
return;
}
}
elog(ERROR, "failed to find requested node %d in SPGiST inner tuple",
nodeN);
}
/*
* Form a new inner tuple containing one more node than the given one, with
* the specified label datum, inserted at offset "offset" in the node array.
* The new tuple's prefix is the same as the old one's.
*
* Note that the new node initially has an invalid downlink. We'll find a
* page to point it to later.
*/
static SpGistInnerTuple
addNode(SpGistState *state, SpGistInnerTuple tuple, Datum label, int offset)
{
SpGistNodeTuple node,
*nodes;
int i;
/* if offset is negative, insert at end */
if (offset < 0)
offset = tuple->nNodes;
else if (offset > tuple->nNodes)
elog(ERROR, "invalid offset for adding node to SPGiST inner tuple");
nodes = palloc(sizeof(SpGistNodeTuple) * (tuple->nNodes + 1));
SGITITERATE(tuple, i, node)
{
if (i < offset)
nodes[i] = node;
else
nodes[i + 1] = node;
}
nodes[offset] = spgFormNodeTuple(state, label, false);
return spgFormInnerTuple(state,
(tuple->prefixSize > 0),
SGITDATUM(tuple, state),
tuple->nNodes + 1,
nodes);
}
/* qsort comparator for sorting OffsetNumbers */
static int
cmpOffsetNumbers(const void *a, const void *b)
{
if (*(const OffsetNumber *) a == *(const OffsetNumber *) b)
return 0;
return (*(const OffsetNumber *) a > *(const OffsetNumber *) b) ? 1 : -1;
}
/*
* Delete multiple tuples from an index page, preserving tuple offset numbers.
*
* The first tuple in the given list is replaced with a dead tuple of type
* "firststate" (REDIRECT/DEAD/PLACEHOLDER); the remaining tuples are replaced
* with dead tuples of type "reststate". If either firststate or reststate
* is REDIRECT, blkno/offnum specify where to link to.
*
* NB: this is used during WAL replay, so beware of trying to make it too
* smart. In particular, it shouldn't use "state" except for calling
* spgFormDeadTuple(). This is also used in a critical section, so no
* pallocs either!
*/
void
spgPageIndexMultiDelete(SpGistState *state, Page page,
OffsetNumber *itemnos, int nitems,
int firststate, int reststate,
BlockNumber blkno, OffsetNumber offnum)
{
OffsetNumber firstItem;
OffsetNumber sortednos[MaxIndexTuplesPerPage];
SpGistDeadTuple tuple = NULL;
int i;
if (nitems == 0)
return; /* nothing to do */
/*
* For efficiency we want to use PageIndexMultiDelete, which requires the
* targets to be listed in sorted order, so we have to sort the itemnos
* array. (This also greatly simplifies the math for reinserting the
* replacement tuples.) However, we must not scribble on the caller's
* array, so we have to make a copy.
*/
memcpy(sortednos, itemnos, sizeof(OffsetNumber) * nitems);
if (nitems > 1)
qsort(sortednos, nitems, sizeof(OffsetNumber), cmpOffsetNumbers);
PageIndexMultiDelete(page, sortednos, nitems);
firstItem = itemnos[0];
for (i = 0; i < nitems; i++)
{
OffsetNumber itemno = sortednos[i];
int tupstate;
tupstate = (itemno == firstItem) ? firststate : reststate;
if (tuple == NULL || tuple->tupstate != tupstate)
tuple = spgFormDeadTuple(state, tupstate, blkno, offnum);
if (PageAddItem(page, (Item) tuple, tuple->size,
itemno, false, false) != itemno)
elog(ERROR, "failed to add item of size %u to SPGiST index page",
tuple->size);
if (tupstate == SPGIST_REDIRECT)
SpGistPageGetOpaque(page)->nRedirection++;
else if (tupstate == SPGIST_PLACEHOLDER)
SpGistPageGetOpaque(page)->nPlaceholder++;
}
}
/*
* Update the parent inner tuple's downlink, and mark the parent buffer
* dirty (this must be the last change to the parent page in the current
* WAL action).
*/
static void
saveNodeLink(Relation index, SPPageDesc *parent,
BlockNumber blkno, OffsetNumber offnum)
{
SpGistInnerTuple innerTuple;
innerTuple = (SpGistInnerTuple) PageGetItem(parent->page,
PageGetItemId(parent->page, parent->offnum));
spgUpdateNodeLink(innerTuple, parent->node, blkno, offnum);
MarkBufferDirty(parent->buffer);
}
/*
* Add a leaf tuple to a leaf page where there is known to be room for it
*/
static void
addLeafTuple(Relation index, SpGistState *state, SpGistLeafTuple leafTuple,
SPPageDesc *current, SPPageDesc *parent, bool isNulls, bool isNew)
{
spgxlogAddLeaf xlrec;
xlrec.newPage = isNew;
xlrec.storesNulls = isNulls;
/* these will be filled below as needed */
xlrec.offnumLeaf = InvalidOffsetNumber;
xlrec.offnumHeadLeaf = InvalidOffsetNumber;
xlrec.offnumParent = InvalidOffsetNumber;
xlrec.nodeI = 0;
START_CRIT_SECTION();
if (current->offnum == InvalidOffsetNumber ||
SpGistBlockIsRoot(current->blkno))
{
/* Tuple is not part of a chain */
leafTuple->nextOffset = InvalidOffsetNumber;
current->offnum = SpGistPageAddNewItem(state, current->page,
(Item) leafTuple, leafTuple->size,
NULL, false);
xlrec.offnumLeaf = current->offnum;
/* Must update parent's downlink if any */
if (parent->buffer != InvalidBuffer)
{
xlrec.offnumParent = parent->offnum;
xlrec.nodeI = parent->node;
saveNodeLink(index, parent, current->blkno, current->offnum);
}
}
else
{
/*
* Tuple must be inserted into existing chain. We mustn't change the
* chain's head address, but we don't need to chase the entire chain
* to put the tuple at the end; we can insert it second.
*
* Also, it's possible that the "chain" consists only of a DEAD tuple,
* in which case we should replace the DEAD tuple in-place.
*/
SpGistLeafTuple head;
OffsetNumber offnum;
head = (SpGistLeafTuple) PageGetItem(current->page,
PageGetItemId(current->page, current->offnum));
if (head->tupstate == SPGIST_LIVE)
{
leafTuple->nextOffset = head->nextOffset;
offnum = SpGistPageAddNewItem(state, current->page,
(Item) leafTuple, leafTuple->size,
NULL, false);
/*
* re-get head of list because it could have been moved on page,
* and set new second element
*/
head = (SpGistLeafTuple) PageGetItem(current->page,
PageGetItemId(current->page, current->offnum));
head->nextOffset = offnum;
xlrec.offnumLeaf = offnum;
xlrec.offnumHeadLeaf = current->offnum;
}
else if (head->tupstate == SPGIST_DEAD)
{
leafTuple->nextOffset = InvalidOffsetNumber;
PageIndexTupleDelete(current->page, current->offnum);
if (PageAddItem(current->page,
(Item) leafTuple, leafTuple->size,
current->offnum, false, false) != current->offnum)
elog(ERROR, "failed to add item of size %u to SPGiST index page",
leafTuple->size);
/* WAL replay distinguishes this case by equal offnums */
xlrec.offnumLeaf = current->offnum;
xlrec.offnumHeadLeaf = current->offnum;
}
else
elog(ERROR, "unexpected SPGiST tuple state: %d", head->tupstate);
}
MarkBufferDirty(current->buffer);
if (RelationNeedsWAL(index) && !state->isBuild)
{
XLogRecPtr recptr;
int flags;
XLogBeginInsert();
XLogRegisterData((char *) &xlrec, sizeof(xlrec));
XLogRegisterData((char *) leafTuple, leafTuple->size);
flags = REGBUF_STANDARD;
if (xlrec.newPage)
flags |= REGBUF_WILL_INIT;
XLogRegisterBuffer(0, current->buffer, flags);
if (xlrec.offnumParent != InvalidOffsetNumber)
XLogRegisterBuffer(1, parent->buffer, REGBUF_STANDARD);
recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_LEAF);
PageSetLSN(current->page, recptr);
/* update parent only if we actually changed it */
if (xlrec.offnumParent != InvalidOffsetNumber)
{
PageSetLSN(parent->page, recptr);
}
}
END_CRIT_SECTION();
}
/*
* Count the number and total size of leaf tuples in the chain starting at
* current->offnum. Return number into *nToSplit and total size as function
* result.
*
* Klugy special case when considering the root page (i.e., root is a leaf
* page, but we're about to split for the first time): return fake large
* values to force spgdoinsert() to take the doPickSplit rather than
* moveLeafs code path. moveLeafs is not prepared to deal with root page.
*/
static int
checkSplitConditions(Relation index, SpGistState *state,
SPPageDesc *current, int *nToSplit)
{
int i,
n = 0,
totalSize = 0;
if (SpGistBlockIsRoot(current->blkno))
{
/* return impossible values to force split */
*nToSplit = BLCKSZ;
return BLCKSZ;
}
i = current->offnum;
while (i != InvalidOffsetNumber)
{
SpGistLeafTuple it;
Assert(i >= FirstOffsetNumber &&
i <= PageGetMaxOffsetNumber(current->page));
it = (SpGistLeafTuple) PageGetItem(current->page,
PageGetItemId(current->page, i));
if (it->tupstate == SPGIST_LIVE)
{
n++;
totalSize += it->size + sizeof(ItemIdData);
}
else if (it->tupstate == SPGIST_DEAD)
{
/* We could see a DEAD tuple as first/only chain item */
Assert(i == current->offnum);
Assert(it->nextOffset == InvalidOffsetNumber);
/* Don't count it in result, because it won't go to other page */
}
else
elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
i = it->nextOffset;
}
*nToSplit = n;
return totalSize;
}
/*
* current points to a leaf-tuple chain that we wanted to add newLeafTuple to,
* but the chain has to be moved because there's not enough room to add
* newLeafTuple to its page. We use this method when the chain contains
* very little data so a split would be inefficient. We are sure we can
* fit the chain plus newLeafTuple on one other page.
*/
static void
moveLeafs(Relation index, SpGistState *state,
SPPageDesc *current, SPPageDesc *parent,
SpGistLeafTuple newLeafTuple, bool isNulls)
{
int i,
nDelete,
nInsert,
size;
Buffer nbuf;
Page npage;
SpGistLeafTuple it;
OffsetNumber r = InvalidOffsetNumber,
startOffset = InvalidOffsetNumber;
bool replaceDead = false;
OffsetNumber *toDelete;
OffsetNumber *toInsert;
BlockNumber nblkno;
spgxlogMoveLeafs xlrec;
char *leafdata,
*leafptr;
/* This doesn't work on root page */
Assert(parent->buffer != InvalidBuffer);
Assert(parent->buffer != current->buffer);
/* Locate the tuples to be moved, and count up the space needed */
i = PageGetMaxOffsetNumber(current->page);
toDelete = (OffsetNumber *) palloc(sizeof(OffsetNumber) * i);
toInsert = (OffsetNumber *) palloc(sizeof(OffsetNumber) * (i + 1));
size = newLeafTuple->size + sizeof(ItemIdData);
nDelete = 0;
i = current->offnum;
while (i != InvalidOffsetNumber)
{
SpGistLeafTuple it;
Assert(i >= FirstOffsetNumber &&
i <= PageGetMaxOffsetNumber(current->page));
it = (SpGistLeafTuple) PageGetItem(current->page,
PageGetItemId(current->page, i));
if (it->tupstate == SPGIST_LIVE)
{
toDelete[nDelete] = i;
size += it->size + sizeof(ItemIdData);
nDelete++;
}
else if (it->tupstate == SPGIST_DEAD)
{
/* We could see a DEAD tuple as first/only chain item */
Assert(i == current->offnum);
Assert(it->nextOffset == InvalidOffsetNumber);
/* We don't want to move it, so don't count it in size */
toDelete[nDelete] = i;
nDelete++;
replaceDead = true;
}
else
elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
i = it->nextOffset;
}
/* Find a leaf page that will hold them */
nbuf = SpGistGetBuffer(index, GBUF_LEAF | (isNulls ? GBUF_NULLS : 0),
size, &xlrec.newPage);
npage = BufferGetPage(nbuf);
nblkno = BufferGetBlockNumber(nbuf);
Assert(nblkno != current->blkno);
leafdata = leafptr = palloc(size);
START_CRIT_SECTION();
/* copy all the old tuples to new page, unless they're dead */
nInsert = 0;
if (!replaceDead)
{
for (i = 0; i < nDelete; i++)
{
it = (SpGistLeafTuple) PageGetItem(current->page,
PageGetItemId(current->page, toDelete[i]));
Assert(it->tupstate == SPGIST_LIVE);
/*
* Update chain link (notice the chain order gets reversed, but we
* don't care). We're modifying the tuple on the source page
* here, but it's okay since we're about to delete it.
*/
it->nextOffset = r;
r = SpGistPageAddNewItem(state, npage, (Item) it, it->size,
&startOffset, false);
toInsert[nInsert] = r;
nInsert++;
/* save modified tuple into leafdata as well */
memcpy(leafptr, it, it->size);
leafptr += it->size;
}
}
/* add the new tuple as well */
newLeafTuple->nextOffset = r;
r = SpGistPageAddNewItem(state, npage,
(Item) newLeafTuple, newLeafTuple->size,
&startOffset, false);
toInsert[nInsert] = r;
nInsert++;
memcpy(leafptr, newLeafTuple, newLeafTuple->size);
leafptr += newLeafTuple->size;
/*
* Now delete the old tuples, leaving a redirection pointer behind for the
* first one, unless we're doing an index build; in which case there can't
* be any concurrent scan so we need not provide a redirect.
*/
spgPageIndexMultiDelete(state, current->page, toDelete, nDelete,
state->isBuild ? SPGIST_PLACEHOLDER : SPGIST_REDIRECT,
SPGIST_PLACEHOLDER,
nblkno, r);
/* Update parent's downlink and mark parent page dirty */
saveNodeLink(index, parent, nblkno, r);
/* Mark the leaf pages too */
MarkBufferDirty(current->buffer);
MarkBufferDirty(nbuf);
if (RelationNeedsWAL(index) && !state->isBuild)
{
XLogRecPtr recptr;
/* prepare WAL info */
STORE_STATE(state, xlrec.stateSrc);
xlrec.nMoves = nDelete;
xlrec.replaceDead = replaceDead;
xlrec.storesNulls = isNulls;
xlrec.offnumParent = parent->offnum;
xlrec.nodeI = parent->node;
XLogBeginInsert();
XLogRegisterData((char *) &xlrec, SizeOfSpgxlogMoveLeafs);
XLogRegisterData((char *) toDelete,
sizeof(OffsetNumber) * nDelete);
XLogRegisterData((char *) toInsert,
sizeof(OffsetNumber) * nInsert);
XLogRegisterData((char *) leafdata, leafptr - leafdata);
XLogRegisterBuffer(0, current->buffer, REGBUF_STANDARD);
XLogRegisterBuffer(1, nbuf, REGBUF_STANDARD | (xlrec.newPage ? REGBUF_WILL_INIT : 0));
XLogRegisterBuffer(2, parent->buffer, REGBUF_STANDARD);
recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_MOVE_LEAFS);
PageSetLSN(current->page, recptr);
PageSetLSN(npage, recptr);
PageSetLSN(parent->page, recptr);
}
END_CRIT_SECTION();
/* Update local free-space cache and release new buffer */
SpGistSetLastUsedPage(index, nbuf);
UnlockReleaseBuffer(nbuf);
}
/*
* Update previously-created redirection tuple with appropriate destination
*
* We use this when it's not convenient to know the destination first.
* The tuple should have been made with the "impossible" destination of
* the metapage.
*/
static void
setRedirectionTuple(SPPageDesc *current, OffsetNumber position,
BlockNumber blkno, OffsetNumber offnum)
{
SpGistDeadTuple dt;
dt = (SpGistDeadTuple) PageGetItem(current->page,
PageGetItemId(current->page, position));
Assert(dt->tupstate == SPGIST_REDIRECT);
Assert(ItemPointerGetBlockNumber(&dt->pointer) == SPGIST_METAPAGE_BLKNO);
ItemPointerSet(&dt->pointer, blkno, offnum);
}
/*
* Test to see if the user-defined picksplit function failed to do its job,
* ie, it put all the leaf tuples into the same node.
* If so, randomly divide the tuples into several nodes (all with the same
* label) and return true to select allTheSame mode for this inner tuple.
*
* (This code is also used to forcibly select allTheSame mode for nulls.)
*
* If we know that the leaf tuples wouldn't all fit on one page, then we
* exclude the last tuple (which is the incoming new tuple that forced a split)
* from the check to see if more than one node is used. The reason for this
* is that if the existing tuples are put into only one chain, then even if
* we move them all to an empty page, there would still not be room for the
* new tuple, so we'd get into an infinite loop of picksplit attempts.
* Forcing allTheSame mode dodges this problem by ensuring the old tuples will
* be split across pages. (Exercise for the reader: figure out why this
* fixes the problem even when there is only one old tuple.)
*/
static bool
checkAllTheSame(spgPickSplitIn *in, spgPickSplitOut *out, bool tooBig,
bool *includeNew)
{
int theNode;
int limit;
int i;
/* For the moment, assume we can include the new leaf tuple */
*includeNew = true;
/* If there's only the new leaf tuple, don't select allTheSame mode */
if (in->nTuples <= 1)
return false;
/* If tuple set doesn't fit on one page, ignore the new tuple in test */
limit = tooBig ? in->nTuples - 1 : in->nTuples;
/* Check to see if more than one node is populated */
theNode = out->mapTuplesToNodes[0];
for (i = 1; i < limit; i++)
{
if (out->mapTuplesToNodes[i] != theNode)
return false;
}
/* Nope, so override the picksplit function's decisions */
/* If the new tuple is in its own node, it can't be included in split */
if (tooBig && out->mapTuplesToNodes[in->nTuples - 1] != theNode)
*includeNew = false;
out->nNodes = 8; /* arbitrary number of child nodes */
/* Random assignment of tuples to nodes (note we include new tuple) */
for (i = 0; i < in->nTuples; i++)
out->mapTuplesToNodes[i] = i % out->nNodes;
/* The opclass may not use node labels, but if it does, duplicate 'em */
if (out->nodeLabels)
{
Datum theLabel = out->nodeLabels[theNode];
out->nodeLabels = (Datum *) palloc(sizeof(Datum) * out->nNodes);
for (i = 0; i < out->nNodes; i++)
out->nodeLabels[i] = theLabel;
}
/* We don't touch the prefix or the leaf tuple datum assignments */
return true;
}
/*
* current points to a leaf-tuple chain that we wanted to add newLeafTuple to,
* but the chain has to be split because there's not enough room to add
* newLeafTuple to its page.
*
* This function splits the leaf tuple set according to picksplit's rules,
* creating one or more new chains that are spread across the current page
* and an additional leaf page (we assume that two leaf pages will be
* sufficient). A new inner tuple is created, and the parent downlink
* pointer is updated to point to that inner tuple instead of the leaf chain.
*
* On exit, current contains the address of the new inner tuple.
*
* Returns true if we successfully inserted newLeafTuple during this function,
* false if caller still has to do it (meaning another picksplit operation is
* probably needed). Failure could occur if the picksplit result is fairly
* unbalanced, or if newLeafTuple is just plain too big to fit on a page.
* Because we force the picksplit result to be at least two chains, each
* cycle will get rid of at least one leaf tuple from the chain, so the loop
* will eventually terminate if lack of balance is the issue. If the tuple
* is too big, we assume that repeated picksplit operations will eventually
* make it small enough by repeated prefix-stripping. A broken opclass could
* make this an infinite loop, though.
*/
static bool
doPickSplit(Relation index, SpGistState *state,
SPPageDesc *current, SPPageDesc *parent,
SpGistLeafTuple newLeafTuple,
int level, bool isNulls, bool isNew)
{
bool insertedNew = false;
spgPickSplitIn in;
spgPickSplitOut out;
FmgrInfo *procinfo;
bool includeNew;
int i,
max,
n;
SpGistInnerTuple innerTuple;
SpGistNodeTuple node,
*nodes;
Buffer newInnerBuffer,
newLeafBuffer;
ItemPointerData *heapPtrs;
uint8 *leafPageSelect;
int *leafSizes;
OffsetNumber *toDelete;
OffsetNumber *toInsert;
OffsetNumber redirectTuplePos = InvalidOffsetNumber;
OffsetNumber startOffsets[2];
SpGistLeafTuple *newLeafs;
int spaceToDelete;
int currentFreeSpace;
int totalLeafSizes;
bool allTheSame;
spgxlogPickSplit xlrec;
char *leafdata,
*leafptr;
SPPageDesc saveCurrent;
int nToDelete,
nToInsert,
maxToInclude;
in.level = level;
/*
* Allocate per-leaf-tuple work arrays with max possible size
*/
max = PageGetMaxOffsetNumber(current->page);
n = max + 1;
in.datums = (Datum *) palloc(sizeof(Datum) * n);
heapPtrs = (ItemPointerData *) palloc(sizeof(ItemPointerData) * n);
toDelete = (OffsetNumber *) palloc(sizeof(OffsetNumber) * n);
toInsert = (OffsetNumber *) palloc(sizeof(OffsetNumber) * n);
newLeafs = (SpGistLeafTuple *) palloc(sizeof(SpGistLeafTuple) * n);
leafPageSelect = (uint8 *) palloc(sizeof(uint8) * n);
STORE_STATE(state, xlrec.stateSrc);
/*
* Form list of leaf tuples which will be distributed as split result;
* also, count up the amount of space that will be freed from current.
* (Note that in the non-root case, we won't actually delete the old
* tuples, only replace them with redirects or placeholders.)
*
* Note: the SGLTDATUM calls here are safe even when dealing with a nulls
* page. For a pass-by-value data type we will fetch a word that must
* exist even though it may contain garbage (because of the fact that leaf
* tuples must have size at least SGDTSIZE). For a pass-by-reference type
* we are just computing a pointer that isn't going to get dereferenced.
* So it's not worth guarding the calls with isNulls checks.
*/
nToInsert = 0;
nToDelete = 0;
spaceToDelete = 0;
if (SpGistBlockIsRoot(current->blkno))
{
/*
* We are splitting the root (which up to now is also a leaf page).
* Its tuples are not linked, so scan sequentially to get them all. We
* ignore the original value of current->offnum.
*/
for (i = FirstOffsetNumber; i <= max; i++)
{
SpGistLeafTuple it;
it = (SpGistLeafTuple) PageGetItem(current->page,
PageGetItemId(current->page, i));
if (it->tupstate == SPGIST_LIVE)
{
in.datums[nToInsert] = SGLTDATUM(it, state);
heapPtrs[nToInsert] = it->heapPtr;
nToInsert++;
toDelete[nToDelete] = i;
nToDelete++;
/* we will delete the tuple altogether, so count full space */
spaceToDelete += it->size + sizeof(ItemIdData);
}
else /* tuples on root should be live */
elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
}
}
else
{
/* Normal case, just collect the leaf tuples in the chain */
i = current->offnum;
while (i != InvalidOffsetNumber)
{
SpGistLeafTuple it;
Assert(i >= FirstOffsetNumber && i <= max);
it = (SpGistLeafTuple) PageGetItem(current->page,
PageGetItemId(current->page, i));
if (it->tupstate == SPGIST_LIVE)
{
in.datums[nToInsert] = SGLTDATUM(it, state);
heapPtrs[nToInsert] = it->heapPtr;
nToInsert++;
toDelete[nToDelete] = i;
nToDelete++;
/* we will not delete the tuple, only replace with dead */
Assert(it->size >= SGDTSIZE);
spaceToDelete += it->size - SGDTSIZE;
}
else if (it->tupstate == SPGIST_DEAD)
{
/* We could see a DEAD tuple as first/only chain item */
Assert(i == current->offnum);
Assert(it->nextOffset == InvalidOffsetNumber);
toDelete[nToDelete] = i;
nToDelete++;
/* replacing it with redirect will save no space */
}
else
elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
i = it->nextOffset;
}
}
in.nTuples = nToInsert;
/*
* We may not actually insert new tuple because another picksplit may be
* necessary due to too large value, but we will try to allocate enough
* space to include it; and in any case it has to be included in the input
* for the picksplit function. So don't increment nToInsert yet.
*/
in.datums[in.nTuples] = SGLTDATUM(newLeafTuple, state);
heapPtrs[in.nTuples] = newLeafTuple->heapPtr;
in.nTuples++;
memset(&out, 0, sizeof(out));
if (!isNulls)
{
/*
* Perform split using user-defined method.
*/
procinfo = index_getprocinfo(index, 1, SPGIST_PICKSPLIT_PROC);
FunctionCall2Coll(procinfo,
index->rd_indcollation[0],
PointerGetDatum(&in),
PointerGetDatum(&out));
/*
* Form new leaf tuples and count up the total space needed.
*/
totalLeafSizes = 0;
for (i = 0; i < in.nTuples; i++)
{
newLeafs[i] = spgFormLeafTuple(state, heapPtrs + i,
out.leafTupleDatums[i],
false);
totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData);
}
}
else
{
/*
* Perform dummy split that puts all tuples into one node.
* checkAllTheSame will override this and force allTheSame mode.
*/
out.hasPrefix = false;
out.nNodes = 1;
out.nodeLabels = NULL;
out.mapTuplesToNodes = palloc0(sizeof(int) * in.nTuples);
/*
* Form new leaf tuples and count up the total space needed.
*/
totalLeafSizes = 0;
for (i = 0; i < in.nTuples; i++)
{
newLeafs[i] = spgFormLeafTuple(state, heapPtrs + i,
(Datum) 0,
true);
totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData);
}
}
/*
* Check to see if the picksplit function failed to separate the values,
* ie, it put them all into the same child node. If so, select allTheSame
* mode and create a random split instead. See comments for
* checkAllTheSame as to why we need to know if the new leaf tuples could
* fit on one page.
*/
allTheSame = checkAllTheSame(&in, &out,
totalLeafSizes > SPGIST_PAGE_CAPACITY,
&includeNew);
/*
* If checkAllTheSame decided we must exclude the new tuple, don't
* consider it any further.
*/
if (includeNew)
maxToInclude = in.nTuples;
else
{
maxToInclude = in.nTuples - 1;
totalLeafSizes -= newLeafs[in.nTuples - 1]->size + sizeof(ItemIdData);
}
/*
* Allocate per-node work arrays. Since checkAllTheSame could replace
* out.nNodes with a value larger than the number of tuples on the input
* page, we can't allocate these arrays before here.
*/
nodes = (SpGistNodeTuple *) palloc(sizeof(SpGistNodeTuple) * out.nNodes);
leafSizes = (int *) palloc0(sizeof(int) * out.nNodes);
/*
* Form nodes of inner tuple and inner tuple itself
*/
for (i = 0; i < out.nNodes; i++)
{
Datum label = (Datum) 0;
bool labelisnull = (out.nodeLabels == NULL);
if (!labelisnull)
label = out.nodeLabels[i];
nodes[i] = spgFormNodeTuple(state, label, labelisnull);
}
innerTuple = spgFormInnerTuple(state,
out.hasPrefix, out.prefixDatum,
out.nNodes, nodes);
innerTuple->allTheSame = allTheSame;
/*
* Update nodes[] array to point into the newly formed innerTuple, so that
* we can adjust their downlinks below.
*/
SGITITERATE(innerTuple, i, node)
{
nodes[i] = node;
}
/*
* Re-scan new leaf tuples and count up the space needed under each node.
*/
for (i = 0; i < maxToInclude; i++)
{
n = out.mapTuplesToNodes[i];
if (n < 0 || n >= out.nNodes)
elog(ERROR, "inconsistent result of SPGiST picksplit function");
leafSizes[n] += newLeafs[i]->size + sizeof(ItemIdData);
}
/*
* To perform the split, we must insert a new inner tuple, which can't go
* on a leaf page; and unless we are splitting the root page, we must then
* update the parent tuple's downlink to point to the inner tuple. If
* there is room, we'll put the new inner tuple on the same page as the
* parent tuple, otherwise we need another non-leaf buffer. But if the
* parent page is the root, we can't add the new inner tuple there,
* because the root page must have only one inner tuple.
*/
xlrec.initInner = false;
if (parent->buffer != InvalidBuffer &&
!SpGistBlockIsRoot(parent->blkno) &&
(SpGistPageGetFreeSpace(parent->page, 1) >=
innerTuple->size + sizeof(ItemIdData)))
{
/* New inner tuple will fit on parent page */
newInnerBuffer = parent->buffer;
}
else if (parent->buffer != InvalidBuffer)
{
/* Send tuple to page with next triple parity (see README) */
newInnerBuffer = SpGistGetBuffer(index,
GBUF_INNER_PARITY(parent->blkno + 1) |
(isNulls ? GBUF_NULLS : 0),
innerTuple->size + sizeof(ItemIdData),
&xlrec.initInner);
}
else
{
/* Root page split ... inner tuple will go to root page */
newInnerBuffer = InvalidBuffer;
}
/*
* The new leaf tuples converted from the existing ones should require the
* same or less space, and therefore should all fit onto one page
* (although that's not necessarily the current page, since we can't
* delete the old tuples but only replace them with placeholders).
* However, the incoming new tuple might not also fit, in which case we
* might need another picksplit cycle to reduce it some more.
*
* If there's not room to put everything back onto the current page, then
* we decide on a per-node basis which tuples go to the new page. (We do
* it like that because leaf tuple chains can't cross pages, so we must
* place all leaf tuples belonging to the same parent node on the same
* page.)
*
* If we are splitting the root page (turning it from a leaf page into an
* inner page), then no leaf tuples can go back to the current page; they
* must all go somewhere else.
*/
if (!SpGistBlockIsRoot(current->blkno))
currentFreeSpace = PageGetExactFreeSpace(current->page) + spaceToDelete;
else
currentFreeSpace = 0; /* prevent assigning any tuples to current */
xlrec.initDest = false;
if (totalLeafSizes <= currentFreeSpace)
{
/* All the leaf tuples will fit on current page */
newLeafBuffer = InvalidBuffer;
/* mark new leaf tuple as included in insertions, if allowed */
if (includeNew)
{
nToInsert++;
insertedNew = true;
}
for (i = 0; i < nToInsert; i++)
leafPageSelect[i] = 0; /* signifies current page */
}
else if (in.nTuples == 1 && totalLeafSizes > SPGIST_PAGE_CAPACITY)
{
/*
* We're trying to split up a long value by repeated suffixing, but
* it's not going to fit yet. Don't bother allocating a second leaf
* buffer that we won't be able to use.
*/
newLeafBuffer = InvalidBuffer;
Assert(includeNew);
Assert(nToInsert == 0);
}
else
{
/* We will need another leaf page */
uint8 *nodePageSelect;
int curspace;
int newspace;
newLeafBuffer = SpGistGetBuffer(index,
GBUF_LEAF | (isNulls ? GBUF_NULLS : 0),
Min(totalLeafSizes,
SPGIST_PAGE_CAPACITY),
&xlrec.initDest);
/*
* Attempt to assign node groups to the two pages. We might fail to
* do so, even if totalLeafSizes is less than the available space,
* because we can't split a group across pages.
*/
nodePageSelect = (uint8 *) palloc(sizeof(uint8) * out.nNodes);
curspace = currentFreeSpace;
newspace = PageGetExactFreeSpace(BufferGetPage(newLeafBuffer));
for (i = 0; i < out.nNodes; i++)
{
if (leafSizes[i] <= curspace)
{
nodePageSelect[i] = 0; /* signifies current page */
curspace -= leafSizes[i];
}
else
{
nodePageSelect[i] = 1; /* signifies new leaf page */
newspace -= leafSizes[i];
}
}
if (curspace >= 0 && newspace >= 0)
{
/* Successful assignment, so we can include the new leaf tuple */
if (includeNew)
{
nToInsert++;
insertedNew = true;
}
}
else if (includeNew)
{
/* We must exclude the new leaf tuple from the split */
int nodeOfNewTuple = out.mapTuplesToNodes[in.nTuples - 1];
leafSizes[nodeOfNewTuple] -=
newLeafs[in.nTuples - 1]->size + sizeof(ItemIdData);
/* Repeat the node assignment process --- should succeed now */
curspace = currentFreeSpace;
newspace = PageGetExactFreeSpace(BufferGetPage(newLeafBuffer));
for (i = 0; i < out.nNodes; i++)
{
if (leafSizes[i] <= curspace)
{
nodePageSelect[i] = 0; /* signifies current page */
curspace -= leafSizes[i];
}
else
{
nodePageSelect[i] = 1; /* signifies new leaf page */
newspace -= leafSizes[i];
}
}
if (curspace < 0 || newspace < 0)
elog(ERROR, "failed to divide leaf tuple groups across pages");
}
else
{
/* oops, we already excluded new tuple ... should not get here */
elog(ERROR, "failed to divide leaf tuple groups across pages");
}
/* Expand the per-node assignments to be shown per leaf tuple */
for (i = 0; i < nToInsert; i++)
{
n = out.mapTuplesToNodes[i];
leafPageSelect[i] = nodePageSelect[n];
}
}
/* Start preparing WAL record */
xlrec.nDelete = 0;
xlrec.initSrc = isNew;
xlrec.storesNulls = isNulls;
xlrec.isRootSplit = SpGistBlockIsRoot(current->blkno);
leafdata = leafptr = (char *) palloc(totalLeafSizes);
/* Here we begin making the changes to the target pages */
START_CRIT_SECTION();
/*
* Delete old leaf tuples from current buffer, except when we're splitting
* the root; in that case there's no need because we'll re-init the page
* below. We do this first to make room for reinserting new leaf tuples.
*/
if (!SpGistBlockIsRoot(current->blkno))
{
/*
* Init buffer instead of deleting individual tuples, but only if
* there aren't any other live tuples and only during build; otherwise
* we need to set a redirection tuple for concurrent scans.
*/
if (state->isBuild &&
nToDelete + SpGistPageGetOpaque(current->page)->nPlaceholder ==
PageGetMaxOffsetNumber(current->page))
{
SpGistInitBuffer(current->buffer,
SPGIST_LEAF | (isNulls ? SPGIST_NULLS : 0));
xlrec.initSrc = true;
}
else if (isNew)
{
/* don't expose the freshly init'd buffer as a backup block */
Assert(nToDelete == 0);
}
else
{
xlrec.nDelete = nToDelete;
if (!state->isBuild)
{
/*
* Need to create redirect tuple (it will point to new inner
* tuple) but right now the new tuple's location is not known
* yet. So, set the redirection pointer to "impossible" value
* and remember its position to update tuple later.
*/
if (nToDelete > 0)
redirectTuplePos = toDelete[0];
spgPageIndexMultiDelete(state, current->page,
toDelete, nToDelete,
SPGIST_REDIRECT,
SPGIST_PLACEHOLDER,
SPGIST_METAPAGE_BLKNO,
FirstOffsetNumber);
}
else
{
/*
* During index build there is not concurrent searches, so we
* don't need to create redirection tuple.
*/
spgPageIndexMultiDelete(state, current->page,
toDelete, nToDelete,
SPGIST_PLACEHOLDER,
SPGIST_PLACEHOLDER,
InvalidBlockNumber,
InvalidOffsetNumber);
}
}
}
/*
* Put leaf tuples on proper pages, and update downlinks in innerTuple's
* nodes.
*/
startOffsets[0] = startOffsets[1] = InvalidOffsetNumber;
for (i = 0; i < nToInsert; i++)
{
SpGistLeafTuple it = newLeafs[i];
Buffer leafBuffer;
BlockNumber leafBlock;
OffsetNumber newoffset;
/* Which page is it going to? */
leafBuffer = leafPageSelect[i] ? newLeafBuffer : current->buffer;
leafBlock = BufferGetBlockNumber(leafBuffer);
/* Link tuple into correct chain for its node */
n = out.mapTuplesToNodes[i];
if (ItemPointerIsValid(&nodes[n]->t_tid))
{
Assert(ItemPointerGetBlockNumber(&nodes[n]->t_tid) == leafBlock);
it->nextOffset = ItemPointerGetOffsetNumber(&nodes[n]->t_tid);
}
else
it->nextOffset = InvalidOffsetNumber;
/* Insert it on page */
newoffset = SpGistPageAddNewItem(state, BufferGetPage(leafBuffer),
(Item) it, it->size,
&startOffsets[leafPageSelect[i]],
false);
toInsert[i] = newoffset;
/* ... and complete the chain linking */
ItemPointerSet(&nodes[n]->t_tid, leafBlock, newoffset);
/* Also copy leaf tuple into WAL data */
memcpy(leafptr, newLeafs[i], newLeafs[i]->size);
leafptr += newLeafs[i]->size;
}
/*
* We're done modifying the other leaf buffer (if any), so mark it dirty.
* current->buffer will be marked below, after we're entirely done
* modifying it.
*/
if (newLeafBuffer != InvalidBuffer)
{
MarkBufferDirty(newLeafBuffer);
}
/* Remember current buffer, since we're about to change "current" */
saveCurrent = *current;
/*
* Store the new innerTuple
*/
if (newInnerBuffer == parent->buffer && newInnerBuffer != InvalidBuffer)
{
/*
* new inner tuple goes to parent page
*/
Assert(current->buffer != parent->buffer);
/* Repoint "current" at the new inner tuple */
current->blkno = parent->blkno;
current->buffer = parent->buffer;
current->page = parent->page;
xlrec.offnumInner = current->offnum =
SpGistPageAddNewItem(state, current->page,
(Item) innerTuple, innerTuple->size,
NULL, false);
/*
* Update parent node link and mark parent page dirty
*/
xlrec.innerIsParent = true;
xlrec.offnumParent = parent->offnum;
xlrec.nodeI = parent->node;
saveNodeLink(index, parent, current->blkno, current->offnum);
/*
* Update redirection link (in old current buffer)
*/
if (redirectTuplePos != InvalidOffsetNumber)
setRedirectionTuple(&saveCurrent, redirectTuplePos,
current->blkno, current->offnum);
/* Done modifying old current buffer, mark it dirty */
MarkBufferDirty(saveCurrent.buffer);
}
else if (parent->buffer != InvalidBuffer)
{
/*
* new inner tuple will be stored on a new page
*/
Assert(newInnerBuffer != InvalidBuffer);
/* Repoint "current" at the new inner tuple */
current->buffer = newInnerBuffer;
current->blkno = BufferGetBlockNumber(current->buffer);
current->page = BufferGetPage(current->buffer);
xlrec.offnumInner = current->offnum =
SpGistPageAddNewItem(state, current->page,
(Item) innerTuple, innerTuple->size,
NULL, false);
/* Done modifying new current buffer, mark it dirty */
MarkBufferDirty(current->buffer);
/*
* Update parent node link and mark parent page dirty
*/
xlrec.innerIsParent = (parent->buffer == current->buffer);
xlrec.offnumParent = parent->offnum;
xlrec.nodeI = parent->node;
saveNodeLink(index, parent, current->blkno, current->offnum);
/*
* Update redirection link (in old current buffer)
*/
if (redirectTuplePos != InvalidOffsetNumber)
setRedirectionTuple(&saveCurrent, redirectTuplePos,
current->blkno, current->offnum);
/* Done modifying old current buffer, mark it dirty */
MarkBufferDirty(saveCurrent.buffer);
}
else
{
/*
* Splitting root page, which was a leaf but now becomes inner page
* (and so "current" continues to point at it)
*/
Assert(SpGistBlockIsRoot(current->blkno));
Assert(redirectTuplePos == InvalidOffsetNumber);
SpGistInitBuffer(current->buffer, (isNulls ? SPGIST_NULLS : 0));
xlrec.initInner = true;
xlrec.innerIsParent = false;
xlrec.offnumInner = current->offnum =
PageAddItem(current->page, (Item) innerTuple, innerTuple->size,
InvalidOffsetNumber, false, false);
if (current->offnum != FirstOffsetNumber)
elog(ERROR, "failed to add item of size %u to SPGiST index page",
innerTuple->size);
/* No parent link to update, nor redirection to do */
xlrec.offnumParent = InvalidOffsetNumber;
xlrec.nodeI = 0;
/* Done modifying new current buffer, mark it dirty */
MarkBufferDirty(current->buffer);
/* saveCurrent doesn't represent a different buffer */
saveCurrent.buffer = InvalidBuffer;
}
if (RelationNeedsWAL(index) && !state->isBuild)
{
XLogRecPtr recptr;
int flags;
XLogBeginInsert();
xlrec.nInsert = nToInsert;
XLogRegisterData((char *) &xlrec, SizeOfSpgxlogPickSplit);
XLogRegisterData((char *) toDelete,
sizeof(OffsetNumber) * xlrec.nDelete);
XLogRegisterData((char *) toInsert,
sizeof(OffsetNumber) * xlrec.nInsert);
XLogRegisterData((char *) leafPageSelect,
sizeof(uint8) * xlrec.nInsert);
XLogRegisterData((char *) innerTuple, innerTuple->size);
XLogRegisterData(leafdata, leafptr - leafdata);
/* Old leaf page */
if (BufferIsValid(saveCurrent.buffer))
{
flags = REGBUF_STANDARD;
if (xlrec.initSrc)
flags |= REGBUF_WILL_INIT;
XLogRegisterBuffer(0, saveCurrent.buffer, flags);
}
/* New leaf page */
if (BufferIsValid(newLeafBuffer))
{
flags = REGBUF_STANDARD;
if (xlrec.initDest)
flags |= REGBUF_WILL_INIT;
XLogRegisterBuffer(1, newLeafBuffer, flags);
}
/* Inner page */
flags = REGBUF_STANDARD;
if (xlrec.initInner)
flags |= REGBUF_WILL_INIT;
XLogRegisterBuffer(2, current->buffer, flags);
/* Parent page, if different from inner page */
if (parent->buffer != InvalidBuffer)
{
if (parent->buffer != current->buffer)
XLogRegisterBuffer(3, parent->buffer, REGBUF_STANDARD);
else
Assert(xlrec.innerIsParent);
}
/* Issue the WAL record */
recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_PICKSPLIT);
/* Update page LSNs on all affected pages */
if (newLeafBuffer != InvalidBuffer)
{
Page page = BufferGetPage(newLeafBuffer);
PageSetLSN(page, recptr);
}
if (saveCurrent.buffer != InvalidBuffer)
{
Page page = BufferGetPage(saveCurrent.buffer);
PageSetLSN(page, recptr);
}
PageSetLSN(current->page, recptr);
if (parent->buffer != InvalidBuffer)
{
PageSetLSN(parent->page, recptr);
}
}
END_CRIT_SECTION();
/* Update local free-space cache and unlock buffers */
if (newLeafBuffer != InvalidBuffer)
{
SpGistSetLastUsedPage(index, newLeafBuffer);
UnlockReleaseBuffer(newLeafBuffer);
}
if (saveCurrent.buffer != InvalidBuffer)
{
SpGistSetLastUsedPage(index, saveCurrent.buffer);
UnlockReleaseBuffer(saveCurrent.buffer);
}
return insertedNew;
}
/*
* spgMatchNode action: descend to N'th child node of current inner tuple
*/
static void
spgMatchNodeAction(Relation index, SpGistState *state,
SpGistInnerTuple innerTuple,
SPPageDesc *current, SPPageDesc *parent, int nodeN)
{
int i;
SpGistNodeTuple node;
/* Release previous parent buffer if any */
if (parent->buffer != InvalidBuffer &&
parent->buffer != current->buffer)
{
SpGistSetLastUsedPage(index, parent->buffer);
UnlockReleaseBuffer(parent->buffer);
}
/* Repoint parent to specified node of current inner tuple */
parent->blkno = current->blkno;
parent->buffer = current->buffer;
parent->page = current->page;
parent->offnum = current->offnum;
parent->node = nodeN;
/* Locate that node */
SGITITERATE(innerTuple, i, node)
{
if (i == nodeN)
break;
}
if (i != nodeN)
elog(ERROR, "failed to find requested node %d in SPGiST inner tuple",
nodeN);
/* Point current to the downlink location, if any */
if (ItemPointerIsValid(&node->t_tid))
{
current->blkno = ItemPointerGetBlockNumber(&node->t_tid);
current->offnum = ItemPointerGetOffsetNumber(&node->t_tid);
}
else
{
/* Downlink is empty, so we'll need to find a new page */
current->blkno = InvalidBlockNumber;
current->offnum = InvalidOffsetNumber;
}
current->buffer = InvalidBuffer;
current->page = NULL;
}
/*
* spgAddNode action: add a node to the inner tuple at current
*/
static void
spgAddNodeAction(Relation index, SpGistState *state,
SpGistInnerTuple innerTuple,
SPPageDesc *current, SPPageDesc *parent,
int nodeN, Datum nodeLabel)
{
SpGistInnerTuple newInnerTuple;
spgxlogAddNode xlrec;
/* Should not be applied to nulls */
Assert(!SpGistPageStoresNulls(current->page));
/* Construct new inner tuple with additional node */
newInnerTuple = addNode(state, innerTuple, nodeLabel, nodeN);
/* Prepare WAL record */
STORE_STATE(state, xlrec.stateSrc);
xlrec.offnum = current->offnum;
/* we don't fill these unless we need to change the parent downlink */
xlrec.parentBlk = -1;
xlrec.offnumParent = InvalidOffsetNumber;
xlrec.nodeI = 0;
/* we don't fill these unless tuple has to be moved */
xlrec.offnumNew = InvalidOffsetNumber;
xlrec.newPage = false;
if (PageGetExactFreeSpace(current->page) >=
newInnerTuple->size - innerTuple->size)
{
/*
* We can replace the inner tuple by new version in-place
*/
START_CRIT_SECTION();
PageIndexTupleDelete(current->page, current->offnum);
if (PageAddItem(current->page,
(Item) newInnerTuple, newInnerTuple->size,
current->offnum, false, false) != current->offnum)
elog(ERROR, "failed to add item of size %u to SPGiST index page",
newInnerTuple->size);
MarkBufferDirty(current->buffer);
if (RelationNeedsWAL(index) && !state->isBuild)
{
XLogRecPtr recptr;
XLogBeginInsert();
XLogRegisterData((char *) &xlrec, sizeof(xlrec));
XLogRegisterData((char *) newInnerTuple, newInnerTuple->size);
XLogRegisterBuffer(0, current->buffer, REGBUF_STANDARD);
recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_NODE);
PageSetLSN(current->page, recptr);
}
END_CRIT_SECTION();
}
else
{
/*
* move inner tuple to another page, and update parent
*/
SpGistDeadTuple dt;
SPPageDesc saveCurrent;
/*
* It should not be possible to get here for the root page, since we
* allow only one inner tuple on the root page, and spgFormInnerTuple
* always checks that inner tuples don't exceed the size of a page.
*/
if (SpGistBlockIsRoot(current->blkno))
elog(ERROR, "cannot enlarge root tuple any more");
Assert(parent->buffer != InvalidBuffer);
saveCurrent = *current;
xlrec.offnumParent = parent->offnum;
xlrec.nodeI = parent->node;
/*
* obtain new buffer with the same parity as current, since it will be
* a child of same parent tuple
*/
current->buffer = SpGistGetBuffer(index,
GBUF_INNER_PARITY(current->blkno),
newInnerTuple->size + sizeof(ItemIdData),
&xlrec.newPage);
current->blkno = BufferGetBlockNumber(current->buffer);
current->page = BufferGetPage(current->buffer);
/*
* Let's just make real sure new current isn't same as old. Right now
* that's impossible, but if SpGistGetBuffer ever got smart enough to
* delete placeholder tuples before checking space, maybe it wouldn't
* be impossible. The case would appear to work except that WAL
* replay would be subtly wrong, so I think a mere assert isn't enough
* here.
*/
if (current->blkno == saveCurrent.blkno)
elog(ERROR, "SPGiST new buffer shouldn't be same as old buffer");
/*
* New current and parent buffer will both be modified; but note that
* parent buffer could be same as either new or old current.
*/
if (parent->buffer == saveCurrent.buffer)
xlrec.parentBlk = 0;
else if (parent->buffer == current->buffer)
xlrec.parentBlk = 1;
else
xlrec.parentBlk = 2;
START_CRIT_SECTION();
/* insert new ... */
xlrec.offnumNew = current->offnum =
SpGistPageAddNewItem(state, current->page,
(Item) newInnerTuple, newInnerTuple->size,
NULL, false);
MarkBufferDirty(current->buffer);
/* update parent's downlink and mark parent page dirty */
saveNodeLink(index, parent, current->blkno, current->offnum);
/*
* Replace old tuple with a placeholder or redirection tuple. Unless
* doing an index build, we have to insert a redirection tuple for
* possible concurrent scans. We can't just delete it in any case,
* because that could change the offsets of other tuples on the page,
* breaking downlinks from their parents.
*/
if (state->isBuild)
dt = spgFormDeadTuple(state, SPGIST_PLACEHOLDER,
InvalidBlockNumber, InvalidOffsetNumber);
else
dt = spgFormDeadTuple(state, SPGIST_REDIRECT,
current->blkno, current->offnum);
PageIndexTupleDelete(saveCurrent.page, saveCurrent.offnum);
if (PageAddItem(saveCurrent.page, (Item) dt, dt->size,
saveCurrent.offnum,
false, false) != saveCurrent.offnum)
elog(ERROR, "failed to add item of size %u to SPGiST index page",
dt->size);
if (state->isBuild)
SpGistPageGetOpaque(saveCurrent.page)->nPlaceholder++;
else
SpGistPageGetOpaque(saveCurrent.page)->nRedirection++;
MarkBufferDirty(saveCurrent.buffer);
if (RelationNeedsWAL(index) && !state->isBuild)
{
XLogRecPtr recptr;
int flags;
XLogBeginInsert();
/* orig page */
XLogRegisterBuffer(0, saveCurrent.buffer, REGBUF_STANDARD);
/* new page */
flags = REGBUF_STANDARD;
if (xlrec.newPage)
flags |= REGBUF_WILL_INIT;
XLogRegisterBuffer(1, current->buffer, flags);
/* parent page (if different from orig and new) */
if (xlrec.parentBlk == 2)
XLogRegisterBuffer(2, parent->buffer, REGBUF_STANDARD);
XLogRegisterData((char *) &xlrec, sizeof(xlrec));
XLogRegisterData((char *) newInnerTuple, newInnerTuple->size);
recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_NODE);
/* we don't bother to check if any of these are redundant */
PageSetLSN(current->page, recptr);
PageSetLSN(parent->page, recptr);
PageSetLSN(saveCurrent.page, recptr);
}
END_CRIT_SECTION();
/* Release saveCurrent if it's not same as current or parent */
if (saveCurrent.buffer != current->buffer &&
saveCurrent.buffer != parent->buffer)
{
SpGistSetLastUsedPage(index, saveCurrent.buffer);
UnlockReleaseBuffer(saveCurrent.buffer);
}
}
}
/*
* spgSplitNode action: split inner tuple at current into prefix and postfix
*/
static void
spgSplitNodeAction(Relation index, SpGistState *state,
SpGistInnerTuple innerTuple,
SPPageDesc *current, spgChooseOut *out)
{
SpGistInnerTuple prefixTuple,
postfixTuple;
SpGistNodeTuple node,
*nodes;
BlockNumber postfixBlkno;
OffsetNumber postfixOffset;
int i;
spgxlogSplitTuple xlrec;
Buffer newBuffer = InvalidBuffer;
/* Should not be applied to nulls */
Assert(!SpGistPageStoresNulls(current->page));
/* Check opclass gave us sane values */
if (out->result.splitTuple.prefixNNodes <= 0 ||
out->result.splitTuple.prefixNNodes > SGITMAXNNODES)
elog(ERROR, "invalid number of prefix nodes: %d",
out->result.splitTuple.prefixNNodes);
if (out->result.splitTuple.childNodeN < 0 ||
out->result.splitTuple.childNodeN >=
out->result.splitTuple.prefixNNodes)
elog(ERROR, "invalid child node number: %d",
out->result.splitTuple.childNodeN);
/*
* Construct new prefix tuple with requested number of nodes. We'll fill
* in the childNodeN'th node's downlink below.
*/
nodes = (SpGistNodeTuple *) palloc(sizeof(SpGistNodeTuple) *
out->result.splitTuple.prefixNNodes);
for (i = 0; i < out->result.splitTuple.prefixNNodes; i++)
{
Datum label = (Datum) 0;
bool labelisnull;
labelisnull = (out->result.splitTuple.prefixNodeLabels == NULL);
if (!labelisnull)
label = out->result.splitTuple.prefixNodeLabels[i];
nodes[i] = spgFormNodeTuple(state, label, labelisnull);
}
prefixTuple = spgFormInnerTuple(state,
out->result.splitTuple.prefixHasPrefix,
out->result.splitTuple.prefixPrefixDatum,
out->result.splitTuple.prefixNNodes,
nodes);
/* it must fit in the space that innerTuple now occupies */
if (prefixTuple->size > innerTuple->size)
elog(ERROR, "SPGiST inner-tuple split must not produce longer prefix");
/*
* Construct new postfix tuple, containing all nodes of innerTuple with
* same node datums, but with the prefix specified by the picksplit
* function.
*/
nodes = palloc(sizeof(SpGistNodeTuple) * innerTuple->nNodes);
SGITITERATE(innerTuple, i, node)
{
nodes[i] = node;
}
postfixTuple = spgFormInnerTuple(state,
out->result.splitTuple.postfixHasPrefix,
out->result.splitTuple.postfixPrefixDatum,
innerTuple->nNodes, nodes);
/* Postfix tuple is allTheSame if original tuple was */
postfixTuple->allTheSame = innerTuple->allTheSame;
/* prep data for WAL record */
xlrec.newPage = false;
/*
* If we can't fit both tuples on the current page, get a new page for the
* postfix tuple. In particular, can't split to the root page.
*
* For the space calculation, note that prefixTuple replaces innerTuple
* but postfixTuple will be a new entry.
*/
if (SpGistBlockIsRoot(current->blkno) ||
SpGistPageGetFreeSpace(current->page, 1) + innerTuple->size <
prefixTuple->size + postfixTuple->size + sizeof(ItemIdData))
{
/*
* Choose page with next triple parity, because postfix tuple is a
* child of prefix one
*/
newBuffer = SpGistGetBuffer(index,
GBUF_INNER_PARITY(current->blkno + 1),
postfixTuple->size + sizeof(ItemIdData),
&xlrec.newPage);
}
START_CRIT_SECTION();
/*
* Replace old tuple by prefix tuple
*/
PageIndexTupleDelete(current->page, current->offnum);
xlrec.offnumPrefix = PageAddItem(current->page,
(Item) prefixTuple, prefixTuple->size,
current->offnum, false, false);
if (xlrec.offnumPrefix != current->offnum)
elog(ERROR, "failed to add item of size %u to SPGiST index page",
prefixTuple->size);
/*
* put postfix tuple into appropriate page
*/
if (newBuffer == InvalidBuffer)
{
postfixBlkno = current->blkno;
xlrec.offnumPostfix = postfixOffset =
SpGistPageAddNewItem(state, current->page,
(Item) postfixTuple, postfixTuple->size,
NULL, false);
xlrec.postfixBlkSame = true;
}
else
{
postfixBlkno = BufferGetBlockNumber(newBuffer);
xlrec.offnumPostfix = postfixOffset =
SpGistPageAddNewItem(state, BufferGetPage(newBuffer),
(Item) postfixTuple, postfixTuple->size,
NULL, false);
MarkBufferDirty(newBuffer);
xlrec.postfixBlkSame = false;
}
/*
* And set downlink pointer in the prefix tuple to point to postfix tuple.
* (We can't avoid this step by doing the above two steps in opposite
* order, because there might not be enough space on the page to insert
* the postfix tuple first.) We have to update the local copy of the
* prefixTuple too, because that's what will be written to WAL.
*/
spgUpdateNodeLink(prefixTuple, out->result.splitTuple.childNodeN,
postfixBlkno, postfixOffset);
prefixTuple = (SpGistInnerTuple) PageGetItem(current->page,
PageGetItemId(current->page, current->offnum));
spgUpdateNodeLink(prefixTuple, out->result.splitTuple.childNodeN,
postfixBlkno, postfixOffset);
MarkBufferDirty(current->buffer);
if (RelationNeedsWAL(index) && !state->isBuild)
{
XLogRecPtr recptr;
XLogBeginInsert();
XLogRegisterData((char *) &xlrec, sizeof(xlrec));
XLogRegisterData((char *) prefixTuple, prefixTuple->size);
XLogRegisterData((char *) postfixTuple, postfixTuple->size);
XLogRegisterBuffer(0, current->buffer, REGBUF_STANDARD);
if (newBuffer != InvalidBuffer)
{
int flags;
flags = REGBUF_STANDARD;
if (xlrec.newPage)
flags |= REGBUF_WILL_INIT;
XLogRegisterBuffer(1, newBuffer, flags);
}
recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_SPLIT_TUPLE);
PageSetLSN(current->page, recptr);
if (newBuffer != InvalidBuffer)
{
PageSetLSN(BufferGetPage(newBuffer), recptr);
}
}
END_CRIT_SECTION();
/* Update local free-space cache and release buffer */
if (newBuffer != InvalidBuffer)
{
SpGistSetLastUsedPage(index, newBuffer);
UnlockReleaseBuffer(newBuffer);
}
}
/*
* Insert one item into the index.
*
* Returns true on success, false if we failed to complete the insertion
* because of conflict with a concurrent insert. In the latter case,
* caller should re-call spgdoinsert() with the same args.
*/
bool
spgdoinsert(Relation index, SpGistState *state,
ItemPointer heapPtr, Datum datum, bool isnull)
{
int level = 0;
Datum leafDatum;
int leafSize;
SPPageDesc current,
parent;
FmgrInfo *procinfo = NULL;
/*
* Look up FmgrInfo of the user-defined choose function once, to save
* cycles in the loop below.
*/
if (!isnull)
procinfo = index_getprocinfo(index, 1, SPGIST_CHOOSE_PROC);
/*
* Prepare the leaf datum to insert.
*
* If an optional "compress" method is provided, then call it to form the
* leaf datum from the input datum. Otherwise store the input datum as
* is. Since we don't use index_form_tuple in this AM, we have to make
* sure value to be inserted is not toasted; FormIndexDatum doesn't
* guarantee that. But we assume the "compress" method to return an
* untoasted value.
*/
if (!isnull)
{
if (OidIsValid(index_getprocid(index, 1, SPGIST_COMPRESS_PROC)))
{
FmgrInfo *compressProcinfo = NULL;
compressProcinfo = index_getprocinfo(index, 1, SPGIST_COMPRESS_PROC);
leafDatum = FunctionCall1Coll(compressProcinfo,
index->rd_indcollation[0],
datum);
}
else
{
Assert(state->attLeafType.type == state->attType.type);
if (state->attType.attlen == -1)
leafDatum = PointerGetDatum(PG_DETOAST_DATUM(datum));
else
leafDatum = datum;
}
}
else
leafDatum = (Datum) 0;
/*
* Compute space needed for a leaf tuple containing the given datum.
*
* If it isn't gonna fit, and the opclass can't reduce the datum size by
* suffixing, bail out now rather than getting into an endless loop.
*/
if (!isnull)
leafSize = SGLTHDRSZ + sizeof(ItemIdData) +
SpGistGetTypeSize(&state->attLeafType, leafDatum);
else
leafSize = SGDTSIZE + sizeof(ItemIdData);
if (leafSize > SPGIST_PAGE_CAPACITY && !state->config.longValuesOK)
ereport(ERROR,
(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
errmsg("index row size %zu exceeds maximum %zu for index \"%s\"",
leafSize - sizeof(ItemIdData),
SPGIST_PAGE_CAPACITY - sizeof(ItemIdData),
RelationGetRelationName(index)),
errhint("Values larger than a buffer page cannot be indexed.")));
/* Initialize "current" to the appropriate root page */
current.blkno = isnull ? SPGIST_NULL_BLKNO : SPGIST_ROOT_BLKNO;
current.buffer = InvalidBuffer;
current.page = NULL;
current.offnum = FirstOffsetNumber;
current.node = -1;
/* "parent" is invalid for the moment */
parent.blkno = InvalidBlockNumber;
parent.buffer = InvalidBuffer;
parent.page = NULL;
parent.offnum = InvalidOffsetNumber;
parent.node = -1;
for (;;)
{
bool isNew = false;
/*
* Bail out if query cancel is pending. We must have this somewhere
* in the loop since a broken opclass could produce an infinite
* picksplit loop.
*/
CHECK_FOR_INTERRUPTS();
if (current.blkno == InvalidBlockNumber)
{
/*
* Create a leaf page. If leafSize is too large to fit on a page,
* we won't actually use the page yet, but it simplifies the API
* for doPickSplit to always have a leaf page at hand; so just
* quietly limit our request to a page size.
*/
current.buffer =
SpGistGetBuffer(index,
GBUF_LEAF | (isnull ? GBUF_NULLS : 0),
Min(leafSize, SPGIST_PAGE_CAPACITY),
&isNew);
current.blkno = BufferGetBlockNumber(current.buffer);
}
else if (parent.buffer == InvalidBuffer)
{
/* we hold no parent-page lock, so no deadlock is possible */
current.buffer = ReadBuffer(index, current.blkno);
LockBuffer(current.buffer, BUFFER_LOCK_EXCLUSIVE);
}
else if (current.blkno != parent.blkno)
{
/* descend to a new child page */
current.buffer = ReadBuffer(index, current.blkno);
/*
* Attempt to acquire lock on child page. We must beware of
* deadlock against another insertion process descending from that
* page to our parent page (see README). If we fail to get lock,
* abandon the insertion and tell our caller to start over.
*
* XXX this could be improved, because failing to get lock on a
* buffer is not proof of a deadlock situation; the lock might be
* held by a reader, or even just background writer/checkpointer
* process. Perhaps it'd be worth retrying after sleeping a bit?
*/
if (!ConditionalLockBuffer(current.buffer))
{
ReleaseBuffer(current.buffer);
UnlockReleaseBuffer(parent.buffer);
return false;
}
}
else
{
/* inner tuple can be stored on the same page as parent one */
current.buffer = parent.buffer;
}
current.page = BufferGetPage(current.buffer);
/* should not arrive at a page of the wrong type */
if (isnull ? !SpGistPageStoresNulls(current.page) :
SpGistPageStoresNulls(current.page))
elog(ERROR, "SPGiST index page %u has wrong nulls flag",
current.blkno);
if (SpGistPageIsLeaf(current.page))
{
SpGistLeafTuple leafTuple;
int nToSplit,
sizeToSplit;
leafTuple = spgFormLeafTuple(state, heapPtr, leafDatum, isnull);
if (leafTuple->size + sizeof(ItemIdData) <=
SpGistPageGetFreeSpace(current.page, 1))
{
/* it fits on page, so insert it and we're done */
addLeafTuple(index, state, leafTuple,
¤t, &parent, isnull, isNew);
break;
}
else if ((sizeToSplit =
checkSplitConditions(index, state, ¤t,
&nToSplit)) < SPGIST_PAGE_CAPACITY / 2 &&
nToSplit < 64 &&
leafTuple->size + sizeof(ItemIdData) + sizeToSplit <= SPGIST_PAGE_CAPACITY)
{
/*
* the amount of data is pretty small, so just move the whole
* chain to another leaf page rather than splitting it.
*/
Assert(!isNew);
moveLeafs(index, state, ¤t, &parent, leafTuple, isnull);
break; /* we're done */
}
else
{
/* picksplit */
if (doPickSplit(index, state, ¤t, &parent,
leafTuple, level, isnull, isNew))
break; /* doPickSplit installed new tuples */
/* leaf tuple will not be inserted yet */
pfree(leafTuple);
/*
* current now describes new inner tuple, go insert into it
*/
Assert(!SpGistPageIsLeaf(current.page));
goto process_inner_tuple;
}
}
else /* non-leaf page */
{
/*
* Apply the opclass choose function to figure out how to insert
* the given datum into the current inner tuple.
*/
SpGistInnerTuple innerTuple;
spgChooseIn in;
spgChooseOut out;
/*
* spgAddNode and spgSplitTuple cases will loop back to here to
* complete the insertion operation. Just in case the choose
* function is broken and produces add or split requests
* repeatedly, check for query cancel.
*/
process_inner_tuple:
CHECK_FOR_INTERRUPTS();
innerTuple = (SpGistInnerTuple) PageGetItem(current.page,
PageGetItemId(current.page, current.offnum));
in.datum = datum;
in.leafDatum = leafDatum;
in.level = level;
in.allTheSame = innerTuple->allTheSame;
in.hasPrefix = (innerTuple->prefixSize > 0);
in.prefixDatum = SGITDATUM(innerTuple, state);
in.nNodes = innerTuple->nNodes;
in.nodeLabels = spgExtractNodeLabels(state, innerTuple);
memset(&out, 0, sizeof(out));
if (!isnull)
{
/* use user-defined choose method */
FunctionCall2Coll(procinfo,
index->rd_indcollation[0],
PointerGetDatum(&in),
PointerGetDatum(&out));
}
else
{
/* force "match" action (to insert to random subnode) */
out.resultType = spgMatchNode;
}
if (innerTuple->allTheSame)
{
/*
* It's not allowed to do an AddNode at an allTheSame tuple.
* Opclass must say "match", in which case we choose a random
* one of the nodes to descend into, or "split".
*/
if (out.resultType == spgAddNode)
elog(ERROR, "cannot add a node to an allTheSame inner tuple");
else if (out.resultType == spgMatchNode)
out.result.matchNode.nodeN = random() % innerTuple->nNodes;
}
switch (out.resultType)
{
case spgMatchNode:
/* Descend to N'th child node */
spgMatchNodeAction(index, state, innerTuple,
¤t, &parent,
out.result.matchNode.nodeN);
/* Adjust level as per opclass request */
level += out.result.matchNode.levelAdd;
/* Replace leafDatum and recompute leafSize */
if (!isnull)
{
leafDatum = out.result.matchNode.restDatum;
leafSize = SGLTHDRSZ + sizeof(ItemIdData) +
SpGistGetTypeSize(&state->attLeafType, leafDatum);
}
/*
* Loop around and attempt to insert the new leafDatum at
* "current" (which might reference an existing child
* tuple, or might be invalid to force us to find a new
* page for the tuple).
*
* Note: if the opclass sets longValuesOK, we rely on the
* choose function to eventually shorten the leafDatum
* enough to fit on a page. We could add a test here to
* complain if the datum doesn't get visibly shorter each
* time, but that could get in the way of opclasses that
* "simplify" datums in a way that doesn't necessarily
* lead to physical shortening on every cycle.
*/
break;
case spgAddNode:
/* AddNode is not sensible if nodes don't have labels */
if (in.nodeLabels == NULL)
elog(ERROR, "cannot add a node to an inner tuple without node labels");
/* Add node to inner tuple, per request */
spgAddNodeAction(index, state, innerTuple,
¤t, &parent,
out.result.addNode.nodeN,
out.result.addNode.nodeLabel);
/*
* Retry insertion into the enlarged node. We assume that
* we'll get a MatchNode result this time.
*/
goto process_inner_tuple;
break;
case spgSplitTuple:
/* Split inner tuple, per request */
spgSplitNodeAction(index, state, innerTuple,
¤t, &out);
/* Retry insertion into the split node */
goto process_inner_tuple;
break;
default:
elog(ERROR, "unrecognized SPGiST choose result: %d",
(int) out.resultType);
break;
}
}
} /* end loop */
/*
* Release any buffers we're still holding. Beware of possibility that
* current and parent reference same buffer.
*/
if (current.buffer != InvalidBuffer)
{
SpGistSetLastUsedPage(index, current.buffer);
UnlockReleaseBuffer(current.buffer);
}
if (parent.buffer != InvalidBuffer &&
parent.buffer != current.buffer)
{
SpGistSetLastUsedPage(index, parent.buffer);
UnlockReleaseBuffer(parent.buffer);
}
return true;
}
相关信息
相关文章
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦