greenplumn brin_pageops 源码
greenplumn brin_pageops 代码
文件路径:/src/backend/access/brin/brin_pageops.c
/*
* brin_pageops.c
* Page-handling routines for BRIN indexes
*
* Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* src/backend/access/brin/brin_pageops.c
*/
#include "postgres.h"
#include "access/brin_pageops.h"
#include "access/brin_page.h"
#include "access/brin_revmap.h"
#include "access/brin_xlog.h"
#include "access/xloginsert.h"
#include "miscadmin.h"
#include "storage/bufmgr.h"
#include "storage/freespace.h"
#include "storage/lmgr.h"
#include "storage/smgr.h"
#include "utils/rel.h"
/*
* Maximum size of an entry in a BRIN_PAGETYPE_REGULAR page. We can tolerate
* a single item per page, unlike other index AMs.
*/
#define BrinMaxItemSize \
MAXALIGN_DOWN(BLCKSZ - \
(MAXALIGN(SizeOfPageHeaderData + \
sizeof(ItemIdData)) + \
MAXALIGN(sizeof(BrinSpecialSpace))))
static Buffer brin_getinsertbuffer(Relation irel, Buffer oldbuf, Size itemsz,
bool *extended);
static Size br_page_get_freespace(Page page);
static void brin_initialize_empty_new_buffer(Relation idxrel, Buffer buffer);
/*
* Update tuple origtup (size origsz), located in offset oldoff of buffer
* oldbuf, to newtup (size newsz) as summary tuple for the page range starting
* at heapBlk. oldbuf must not be locked on entry, and is not locked at exit.
*
* If samepage is true, attempt to put the new tuple in the same page, but if
* there's no room, use some other one.
*
* If the update is successful, return true; the revmap is updated to point to
* the new tuple. If the update is not done for whatever reason, return false.
* Caller may retry the update if this happens.
*/
bool
brin_doupdate(Relation idxrel, BlockNumber pagesPerRange,
BrinRevmap *revmap, BlockNumber heapBlk,
Buffer oldbuf, OffsetNumber oldoff,
const BrinTuple *origtup, Size origsz,
const BrinTuple *newtup, Size newsz,
bool samepage, bool skipextend)
{
Page oldpage;
ItemId oldlp;
BrinTuple *oldtup;
Size oldsz;
Buffer newbuf;
BlockNumber newblk = InvalidBlockNumber;
bool extended;
Assert(newsz == MAXALIGN(newsz));
/* If the item is oversized, don't bother. */
if (newsz > BrinMaxItemSize)
{
ereport(ERROR,
(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
errmsg("index row size %zu exceeds maximum %zu for index \"%s\"",
newsz, BrinMaxItemSize, RelationGetRelationName(idxrel))));
return false; /* keep compiler quiet */
}
/* make sure the revmap is long enough to contain the entry we need */
if (!skipextend)
brinRevmapExtend(revmap, heapBlk);
if (!samepage)
{
/* need a page on which to put the item */
newbuf = brin_getinsertbuffer(idxrel, oldbuf, newsz, &extended);
if (!BufferIsValid(newbuf))
{
Assert(!extended);
return false;
}
/*
* Note: it's possible (though unlikely) that the returned newbuf is
* the same as oldbuf, if brin_getinsertbuffer determined that the old
* buffer does in fact have enough space.
*/
if (newbuf == oldbuf)
{
Assert(!extended);
newbuf = InvalidBuffer;
}
else
newblk = BufferGetBlockNumber(newbuf);
}
else
{
LockBuffer(oldbuf, BUFFER_LOCK_EXCLUSIVE);
newbuf = InvalidBuffer;
extended = false;
}
oldpage = BufferGetPage(oldbuf);
oldlp = PageGetItemId(oldpage, oldoff);
/*
* Check that the old tuple wasn't updated concurrently: it might have
* moved someplace else entirely, and for that matter the whole page
* might've become a revmap page. Note that in the first two cases
* checked here, the "oldlp" we just calculated is garbage; but
* PageGetItemId() is simple enough that it was safe to do that
* calculation anyway.
*/
if (!BRIN_IS_REGULAR_PAGE(oldpage) ||
oldoff > PageGetMaxOffsetNumber(oldpage) ||
!ItemIdIsNormal(oldlp))
{
LockBuffer(oldbuf, BUFFER_LOCK_UNLOCK);
/*
* If this happens, and the new buffer was obtained by extending the
* relation, then we need to ensure we don't leave it uninitialized or
* forget about it.
*/
if (BufferIsValid(newbuf))
{
if (extended)
brin_initialize_empty_new_buffer(idxrel, newbuf);
UnlockReleaseBuffer(newbuf);
if (extended)
FreeSpaceMapVacuumRange(idxrel, newblk, newblk + 1);
}
return false;
}
oldsz = ItemIdGetLength(oldlp);
oldtup = (BrinTuple *) PageGetItem(oldpage, oldlp);
/*
* ... or it might have been updated in place to different contents.
*/
if (!brin_tuples_equal(oldtup, oldsz, origtup, origsz))
{
LockBuffer(oldbuf, BUFFER_LOCK_UNLOCK);
if (BufferIsValid(newbuf))
{
/* As above, initialize and record new page if we got one */
if (extended)
brin_initialize_empty_new_buffer(idxrel, newbuf);
UnlockReleaseBuffer(newbuf);
if (extended)
FreeSpaceMapVacuumRange(idxrel, newblk, newblk + 1);
}
return false;
}
/*
* Great, the old tuple is intact. We can proceed with the update.
*
* If there's enough room in the old page for the new tuple, replace it.
*
* Note that there might now be enough space on the page even though the
* caller told us there isn't, if a concurrent update moved another tuple
* elsewhere or replaced a tuple with a smaller one.
*/
if (((BrinPageFlags(oldpage) & BRIN_EVACUATE_PAGE) == 0) &&
brin_can_do_samepage_update(oldbuf, origsz, newsz))
{
START_CRIT_SECTION();
if (!PageIndexTupleOverwrite(oldpage, oldoff, (Item) unconstify(BrinTuple *, newtup), newsz))
elog(ERROR, "failed to replace BRIN tuple");
MarkBufferDirty(oldbuf);
/* XLOG stuff */
if (RelationNeedsWAL(idxrel))
{
xl_brin_samepage_update xlrec;
XLogRecPtr recptr;
uint8 info = XLOG_BRIN_SAMEPAGE_UPDATE;
xlrec.offnum = oldoff;
XLogBeginInsert();
XLogRegisterData((char *) &xlrec, SizeOfBrinSamepageUpdate);
XLogRegisterBuffer(0, oldbuf, REGBUF_STANDARD);
XLogRegisterBufData(0, (char *) unconstify(BrinTuple *, newtup), newsz);
recptr = XLogInsert(RM_BRIN_ID, info);
PageSetLSN(oldpage, recptr);
}
END_CRIT_SECTION();
LockBuffer(oldbuf, BUFFER_LOCK_UNLOCK);
if (BufferIsValid(newbuf))
{
/* As above, initialize and record new page if we got one */
if (extended)
brin_initialize_empty_new_buffer(idxrel, newbuf);
UnlockReleaseBuffer(newbuf);
if (extended)
FreeSpaceMapVacuumRange(idxrel, newblk, newblk + 1);
}
return true;
}
else if (newbuf == InvalidBuffer)
{
/*
* Not enough space, but caller said that there was. Tell them to
* start over.
*/
LockBuffer(oldbuf, BUFFER_LOCK_UNLOCK);
return false;
}
else
{
/*
* Not enough free space on the oldpage. Put the new tuple on the new
* page, and update the revmap.
*/
Page newpage = BufferGetPage(newbuf);
Buffer revmapbuf;
ItemPointerData newtid;
OffsetNumber newoff;
Size freespace = 0;
revmapbuf = brinLockRevmapPageForUpdate(revmap, heapBlk);
START_CRIT_SECTION();
/*
* We need to initialize the page if it's newly obtained. Note we
* will WAL-log the initialization as part of the update, so we don't
* need to do that here.
*/
if (extended)
brin_page_init(newpage, BRIN_PAGETYPE_REGULAR);
PageIndexTupleDeleteNoCompact(oldpage, oldoff);
newoff = PageAddItem(newpage, (Item) unconstify(BrinTuple *, newtup), newsz,
InvalidOffsetNumber, false, false);
if (newoff == InvalidOffsetNumber)
elog(ERROR, "failed to add BRIN tuple to new page");
MarkBufferDirty(oldbuf);
MarkBufferDirty(newbuf);
/* needed to update FSM below */
if (extended)
freespace = br_page_get_freespace(newpage);
ItemPointerSet(&newtid, newblk, newoff);
brinSetHeapBlockItemptr(revmapbuf, pagesPerRange, heapBlk, newtid);
MarkBufferDirty(revmapbuf);
/* XLOG stuff */
if (RelationNeedsWAL(idxrel))
{
xl_brin_update xlrec;
XLogRecPtr recptr;
uint8 info;
info = XLOG_BRIN_UPDATE | (extended ? XLOG_BRIN_INIT_PAGE : 0);
xlrec.insert.offnum = newoff;
xlrec.insert.heapBlk = heapBlk;
xlrec.insert.pagesPerRange = pagesPerRange;
xlrec.oldOffnum = oldoff;
XLogBeginInsert();
/* new page */
XLogRegisterData((char *) &xlrec, SizeOfBrinUpdate);
XLogRegisterBuffer(0, newbuf, REGBUF_STANDARD | (extended ? REGBUF_WILL_INIT : 0));
XLogRegisterBufData(0, (char *) unconstify(BrinTuple *, newtup), newsz);
/* revmap page */
XLogRegisterBuffer(1, revmapbuf, 0);
/* old page */
XLogRegisterBuffer(2, oldbuf, REGBUF_STANDARD);
recptr = XLogInsert(RM_BRIN_ID, info);
PageSetLSN(oldpage, recptr);
PageSetLSN(newpage, recptr);
PageSetLSN(BufferGetPage(revmapbuf), recptr);
}
END_CRIT_SECTION();
LockBuffer(revmapbuf, BUFFER_LOCK_UNLOCK);
LockBuffer(oldbuf, BUFFER_LOCK_UNLOCK);
UnlockReleaseBuffer(newbuf);
if (extended)
{
RecordPageWithFreeSpace(idxrel, newblk, freespace);
FreeSpaceMapVacuumRange(idxrel, newblk, newblk + 1);
}
return true;
}
}
/*
* Return whether brin_doupdate can do a samepage update.
*/
bool
brin_can_do_samepage_update(Buffer buffer, Size origsz, Size newsz)
{
return
((newsz <= origsz) ||
PageGetExactFreeSpace(BufferGetPage(buffer)) >= (newsz - origsz));
}
/*
* Insert an index tuple into the index relation. The revmap is updated to
* mark the range containing the given page as pointing to the inserted entry.
* A WAL record is written.
*
* The buffer, if valid, is first checked for free space to insert the new
* entry; if there isn't enough, a new buffer is obtained and pinned. No
* buffer lock must be held on entry, no buffer lock is held on exit.
*
* Return value is the offset number where the tuple was inserted.
*/
OffsetNumber
brin_doinsert(Relation idxrel, BlockNumber pagesPerRange,
BrinRevmap *revmap, Buffer *buffer, BlockNumber heapBlk,
BrinTuple *tup, Size itemsz)
{
Page page;
BlockNumber blk;
OffsetNumber off;
Size freespace = 0;
Buffer revmapbuf;
ItemPointerData tid;
bool extended;
Assert(itemsz == MAXALIGN(itemsz));
/* If the item is oversized, don't even bother. */
if (itemsz > BrinMaxItemSize)
{
ereport(ERROR,
(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
errmsg("index row size %zu exceeds maximum %zu for index \"%s\"",
itemsz, BrinMaxItemSize, RelationGetRelationName(idxrel))));
return InvalidOffsetNumber; /* keep compiler quiet */
}
/* Make sure the revmap is long enough to contain the entry we need */
brinRevmapExtend(revmap, heapBlk);
/*
* Acquire lock on buffer supplied by caller, if any. If it doesn't have
* enough space, unpin it to obtain a new one below.
*/
if (BufferIsValid(*buffer))
{
/*
* It's possible that another backend (or ourselves!) extended the
* revmap over the page we held a pin on, so we cannot assume that
* it's still a regular page.
*/
LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
if (br_page_get_freespace(BufferGetPage(*buffer)) < itemsz)
{
UnlockReleaseBuffer(*buffer);
*buffer = InvalidBuffer;
}
}
/*
* If we still don't have a usable buffer, have brin_getinsertbuffer
* obtain one for us.
*/
if (!BufferIsValid(*buffer))
{
do
*buffer = brin_getinsertbuffer(idxrel, InvalidBuffer, itemsz, &extended);
while (!BufferIsValid(*buffer));
}
else
extended = false;
/* Now obtain lock on revmap buffer */
revmapbuf = brinLockRevmapPageForUpdate(revmap, heapBlk);
page = BufferGetPage(*buffer);
blk = BufferGetBlockNumber(*buffer);
/* Execute the actual insertion */
START_CRIT_SECTION();
if (extended)
brin_page_init(page, BRIN_PAGETYPE_REGULAR);
off = PageAddItem(page, (Item) tup, itemsz, InvalidOffsetNumber,
false, false);
if (off == InvalidOffsetNumber)
elog(ERROR, "failed to add BRIN tuple to new page");
MarkBufferDirty(*buffer);
/* needed to update FSM below */
if (extended)
freespace = br_page_get_freespace(page);
ItemPointerSet(&tid, blk, off);
brinSetHeapBlockItemptr(revmapbuf, pagesPerRange, heapBlk, tid);
MarkBufferDirty(revmapbuf);
/* XLOG stuff */
if (RelationNeedsWAL(idxrel))
{
xl_brin_insert xlrec;
XLogRecPtr recptr;
uint8 info;
info = XLOG_BRIN_INSERT | (extended ? XLOG_BRIN_INIT_PAGE : 0);
xlrec.heapBlk = heapBlk;
xlrec.pagesPerRange = pagesPerRange;
xlrec.offnum = off;
XLogBeginInsert();
XLogRegisterData((char *) &xlrec, SizeOfBrinInsert);
XLogRegisterBuffer(0, *buffer, REGBUF_STANDARD | (extended ? REGBUF_WILL_INIT : 0));
XLogRegisterBufData(0, (char *) tup, itemsz);
XLogRegisterBuffer(1, revmapbuf, 0);
recptr = XLogInsert(RM_BRIN_ID, info);
PageSetLSN(page, recptr);
PageSetLSN(BufferGetPage(revmapbuf), recptr);
}
END_CRIT_SECTION();
/* Tuple is firmly on buffer; we can release our locks */
LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
LockBuffer(revmapbuf, BUFFER_LOCK_UNLOCK);
BRIN_elog((DEBUG2, "inserted tuple (%u,%u) for range starting at %u",
blk, off, heapBlk));
if (extended)
{
RecordPageWithFreeSpace(idxrel, blk, freespace);
FreeSpaceMapVacuumRange(idxrel, blk, blk + 1);
}
return off;
}
/*
* Initialize a page with the given type.
*
* Caller is responsible for marking it dirty, as appropriate.
*/
void
brin_page_init(Page page, uint16 type)
{
PageInit(page, BLCKSZ, sizeof(BrinSpecialSpace));
BrinPageType(page) = type;
}
/*
* Initialize a new BRIN index's metapage.
*/
void
brin_metapage_init(Page page, BlockNumber pagesPerRange, uint16 version, bool isAo)
{
BrinMetaPageData *metadata;
brin_page_init(page, BRIN_PAGETYPE_META);
metadata = (BrinMetaPageData *) PageGetContents(page);
metadata->brinMagic = BRIN_META_MAGIC;
metadata->brinVersion = version;
metadata->pagesPerRange = pagesPerRange;
metadata->isAo = isAo;
/*
* Note we cheat here a little. 0 is not a valid revmap block number
* (because it's the metapage buffer), but doing this enables the first
* revmap page to be created when the index is.
*/
metadata->lastRevmapPage = 0;
/*
* Set pd_lower just past the end of the metadata. This is essential,
* because without doing so, metadata will be lost if xlog.c compresses
* the page.
*/
((PageHeader) page)->pd_lower =
((char *) metadata + sizeof(BrinMetaPageData)) - (char *) page;
}
/*
* Initiate page evacuation protocol.
*
* The page must be locked in exclusive mode by the caller.
*
* If the page is not yet initialized or empty, return false without doing
* anything; it can be used for revmap without any further changes. If it
* contains tuples, mark it for evacuation and return true.
*/
bool
brin_start_evacuating_page(Relation idxRel, Buffer buf)
{
OffsetNumber off;
OffsetNumber maxoff;
Page page;
page = BufferGetPage(buf);
if (PageIsNew(page))
return false;
maxoff = PageGetMaxOffsetNumber(page);
for (off = FirstOffsetNumber; off <= maxoff; off++)
{
ItemId lp;
lp = PageGetItemId(page, off);
if (ItemIdIsUsed(lp))
{
/* prevent other backends from adding more stuff to this page */
BrinPageFlags(page) |= BRIN_EVACUATE_PAGE;
MarkBufferDirtyHint(buf, true);
return true;
}
}
return false;
}
/*
* Move all tuples out of a page.
*
* The caller must hold lock on the page. The lock and pin are released.
*/
void
brin_evacuate_page(Relation idxRel, BlockNumber pagesPerRange,
BrinRevmap *revmap, Buffer buf)
{
OffsetNumber off;
OffsetNumber maxoff;
Page page;
BrinTuple *btup = NULL;
Size btupsz = 0;
page = BufferGetPage(buf);
Assert(BrinPageFlags(page) & BRIN_EVACUATE_PAGE);
maxoff = PageGetMaxOffsetNumber(page);
for (off = FirstOffsetNumber; off <= maxoff; off++)
{
BrinTuple *tup;
Size sz;
ItemId lp;
CHECK_FOR_INTERRUPTS();
lp = PageGetItemId(page, off);
if (ItemIdIsUsed(lp))
{
sz = ItemIdGetLength(lp);
tup = (BrinTuple *) PageGetItem(page, lp);
tup = brin_copy_tuple(tup, sz, btup, &btupsz);
LockBuffer(buf, BUFFER_LOCK_UNLOCK);
if (!brin_doupdate(idxRel, pagesPerRange, revmap, tup->bt_blkno,
buf, off, tup, sz, tup, sz, false, true))
off--; /* retry */
LockBuffer(buf, BUFFER_LOCK_SHARE);
/* It's possible that someone extended the revmap over this page */
if (!BRIN_IS_REGULAR_PAGE(page))
break;
}
}
UnlockReleaseBuffer(buf);
}
/*
* Given a BRIN index page, initialize it if necessary, and record its
* current free space in the FSM.
*
* The main use for this is when, during vacuuming, an uninitialized page is
* found, which could be the result of relation extension followed by a crash
* before the page can be used.
*
* Here, we don't bother to update upper FSM pages, instead expecting that our
* caller (brin_vacuum_scan) will fix them at the end of the scan. Elsewhere
* in this file, it's generally a good idea to propagate additions of free
* space into the upper FSM pages immediately.
*/
void
brin_page_cleanup(Relation idxrel, Buffer buf)
{
Page page = BufferGetPage(buf);
/*
* If a page was left uninitialized, initialize it now; also record it in
* FSM.
*
* Somebody else might be extending the relation concurrently. To avoid
* re-initializing the page before they can grab the buffer lock, we
* acquire the extension lock momentarily. Since they hold the extension
* lock from before getting the page and after its been initialized, we're
* sure to see their initialization.
*/
if (PageIsNew(page))
{
LockRelationForExtension(idxrel, ShareLock);
UnlockRelationForExtension(idxrel, ShareLock);
LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
if (PageIsNew(page))
{
brin_initialize_empty_new_buffer(idxrel, buf);
LockBuffer(buf, BUFFER_LOCK_UNLOCK);
return;
}
LockBuffer(buf, BUFFER_LOCK_UNLOCK);
}
/* Nothing to be done for non-regular index pages */
if (BRIN_IS_META_PAGE(BufferGetPage(buf)) ||
BRIN_IS_REVMAP_PAGE(BufferGetPage(buf)) ||
BRIN_IS_UPPER_PAGE(BufferGetPage(buf)))
return;
/* Measure free space and record it */
RecordPageWithFreeSpace(idxrel, BufferGetBlockNumber(buf),
br_page_get_freespace(page));
}
/*
* Return a pinned and exclusively locked buffer which can be used to insert an
* index item of size itemsz (caller must ensure not to request sizes
* impossible to fulfill). If oldbuf is a valid buffer, it is also locked (in
* an order determined to avoid deadlocks).
*
* If we find that the old page is no longer a regular index page (because
* of a revmap extension), the old buffer is unlocked and we return
* InvalidBuffer.
*
* If there's no existing page with enough free space to accommodate the new
* item, the relation is extended. If this happens, *extended is set to true,
* and it is the caller's responsibility to initialize the page (and WAL-log
* that fact) prior to use. The caller should also update the FSM with the
* page's remaining free space after the insertion.
*
* Note that the caller is not expected to update FSM unless *extended is set
* true. This policy means that we'll update FSM when a page is created, and
* when it's found to have too little space for a desired tuple insertion,
* but not every single time we add a tuple to the page.
*
* Note that in some corner cases it is possible for this routine to extend
* the relation and then not return the new page. It is this routine's
* responsibility to WAL-log the page initialization and to record the page in
* FSM if that happens, since the caller certainly can't do it.
*/
static Buffer
brin_getinsertbuffer(Relation irel, Buffer oldbuf, Size itemsz,
bool *extended)
{
BlockNumber oldblk;
BlockNumber newblk;
Page page;
Size freespace;
/* callers must have checked */
Assert(itemsz <= BrinMaxItemSize);
if (BufferIsValid(oldbuf))
oldblk = BufferGetBlockNumber(oldbuf);
else
oldblk = InvalidBlockNumber;
/* Choose initial target page, re-using existing target if known */
newblk = RelationGetTargetBlock(irel);
if (newblk == InvalidBlockNumber)
newblk = GetPageWithFreeSpace(irel, itemsz);
/*
* Loop until we find a page with sufficient free space. By the time we
* return to caller out of this loop, both buffers are valid and locked;
* if we have to restart here, neither page is locked and newblk isn't
* pinned (if it's even valid).
*/
for (;;)
{
Buffer buf;
bool extensionLockHeld = false;
CHECK_FOR_INTERRUPTS();
*extended = false;
if (newblk == InvalidBlockNumber)
{
/*
* There's not enough free space in any existing index page,
* according to the FSM: extend the relation to obtain a shiny new
* page.
*/
if (!RELATION_IS_LOCAL(irel))
{
LockRelationForExtension(irel, ExclusiveLock);
extensionLockHeld = true;
}
buf = ReadBuffer(irel, P_NEW);
newblk = BufferGetBlockNumber(buf);
*extended = true;
BRIN_elog((DEBUG2, "brin_getinsertbuffer: extending to page %u",
BufferGetBlockNumber(buf)));
}
else if (newblk == oldblk)
{
/*
* There's an odd corner-case here where the FSM is out-of-date,
* and gave us the old page.
*/
buf = oldbuf;
}
else
{
buf = ReadBuffer(irel, newblk);
}
/*
* We lock the old buffer first, if it's earlier than the new one; but
* then we need to check that it hasn't been turned into a revmap page
* concurrently. If we detect that that happened, give up and tell
* caller to start over.
*/
if (BufferIsValid(oldbuf) && oldblk < newblk)
{
LockBuffer(oldbuf, BUFFER_LOCK_EXCLUSIVE);
if (!BRIN_IS_REGULAR_PAGE(BufferGetPage(oldbuf)))
{
LockBuffer(oldbuf, BUFFER_LOCK_UNLOCK);
/*
* It is possible that the new page was obtained from
* extending the relation. In that case, we must be sure to
* record it in the FSM before leaving, because otherwise the
* space would be lost forever. However, we cannot let an
* uninitialized page get in the FSM, so we need to initialize
* it first.
*/
if (*extended)
brin_initialize_empty_new_buffer(irel, buf);
if (extensionLockHeld)
UnlockRelationForExtension(irel, ExclusiveLock);
ReleaseBuffer(buf);
if (*extended)
{
FreeSpaceMapVacuumRange(irel, newblk, newblk + 1);
/* shouldn't matter, but don't confuse caller */
*extended = false;
}
return InvalidBuffer;
}
}
LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
if (extensionLockHeld)
UnlockRelationForExtension(irel, ExclusiveLock);
page = BufferGetPage(buf);
/*
* We have a new buffer to insert into. Check that the new page has
* enough free space, and return it if it does; otherwise start over.
* (br_page_get_freespace also checks that the FSM didn't hand us a
* page that has since been repurposed for the revmap.)
*/
freespace = *extended ?
BrinMaxItemSize : br_page_get_freespace(page);
if (freespace >= itemsz)
{
RelationSetTargetBlock(irel, newblk);
/*
* Lock the old buffer if not locked already. Note that in this
* case we know for sure it's a regular page: it's later than the
* new page we just got, which is not a revmap page, and revmap
* pages are always consecutive.
*/
if (BufferIsValid(oldbuf) && oldblk > newblk)
{
LockBuffer(oldbuf, BUFFER_LOCK_EXCLUSIVE);
Assert(BRIN_IS_REGULAR_PAGE(BufferGetPage(oldbuf)));
}
return buf;
}
/* This page is no good. */
/*
* If an entirely new page does not contain enough free space for the
* new item, then surely that item is oversized. Complain loudly; but
* first make sure we initialize the page and record it as free, for
* next time.
*/
if (*extended)
{
brin_initialize_empty_new_buffer(irel, buf);
/* since this should not happen, skip FreeSpaceMapVacuum */
ereport(ERROR,
(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
errmsg("index row size %zu exceeds maximum %zu for index \"%s\"",
itemsz, freespace, RelationGetRelationName(irel))));
return InvalidBuffer; /* keep compiler quiet */
}
if (newblk != oldblk)
UnlockReleaseBuffer(buf);
if (BufferIsValid(oldbuf) && oldblk <= newblk)
LockBuffer(oldbuf, BUFFER_LOCK_UNLOCK);
/*
* Update the FSM with the new, presumably smaller, freespace value
* for this page, then search for a new target page.
*/
newblk = RecordAndGetPageWithFreeSpace(irel, newblk, freespace, itemsz);
}
}
/*
* Initialize a page as an empty regular BRIN page, WAL-log this, and record
* the page in FSM.
*
* There are several corner situations in which we extend the relation to
* obtain a new page and later find that we cannot use it immediately. When
* that happens, we don't want to leave the page go unrecorded in FSM, because
* there is no mechanism to get the space back and the index would bloat.
* Also, because we would not WAL-log the action that would initialize the
* page, the page would go uninitialized in a standby (or after recovery).
*
* While we record the page in FSM here, caller is responsible for doing FSM
* upper-page update if that seems appropriate.
*/
static void
brin_initialize_empty_new_buffer(Relation idxrel, Buffer buffer)
{
Page page;
BRIN_elog((DEBUG2,
"brin_initialize_empty_new_buffer: initializing blank page %u",
BufferGetBlockNumber(buffer)));
START_CRIT_SECTION();
page = BufferGetPage(buffer);
brin_page_init(page, BRIN_PAGETYPE_REGULAR);
MarkBufferDirty(buffer);
log_newpage_buffer(buffer, true);
END_CRIT_SECTION();
/*
* We update the FSM for this page, but this is not WAL-logged. This is
* acceptable because VACUUM will scan the index and update the FSM with
* pages whose FSM records were forgotten in a crash.
*/
RecordPageWithFreeSpace(idxrel, BufferGetBlockNumber(buffer),
br_page_get_freespace(page));
}
/*
* Return the amount of free space on a regular BRIN index page.
*
* If the page is not a regular page, or has been marked with the
* BRIN_EVACUATE_PAGE flag, returns 0.
*/
static Size
br_page_get_freespace(Page page)
{
if (!BRIN_IS_REGULAR_PAGE(page) ||
(BrinPageFlags(page) & BRIN_EVACUATE_PAGE) != 0)
return 0;
else
return PageGetFreeSpace(page);
}
相关信息
相关文章
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦