greenplumn sharedsnapshot 源码
greenplumn sharedsnapshot 代码
文件路径:/src/backend/utils/time/sharedsnapshot.c
/*-------------------------------------------------------------------------
*
* sharedsnapshot.c
* GPDB shared snapshot management.
*
* Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* src/backend/utils/time/sharedsnapshot.c
*
* In Greenplum, as part of slice plans, many postgres processes (qExecs, QE)
* running on a single segment database as part of the same user's SQL
* statement. All of the qExecs that belong to a particular user on a
* particular segment database need to have consistent visibility. Idea used
* is called "Shared Local Snapshot". Shared-memory data structure
* SharedSnapshotSlot shares session and transaction information among
* session's gang processes on a particular database instance. The processes
* are called a SegMate process group.
*
* A SegMate process group is a QE (Query Executor) Writer process and 0, 1 or
* more QE Reader processes. Greenplum needed to invent a SegMate sharing
* mechanism because in Postgres there is only 1 backend and most needed
* information is simply available in private memory. With Greenplum session
* parallelism on database instances, we need to have a way to share
* not-yet-committed session information among the SegMates. This information
* includes transaction snapshots, sub-transaction status, so-called combo-cid
* mapping, etc.
*
* An example: the QE readers need to use the same snapshot and command number
* information as the QE writer so they see the current data written by the QE
* writer. During a transaction, the QE Writer writes new data into the
* shared-memory buffered cache. Later in that same transaction, QE Readers
* will need to recognize which tuples in the shared-memory buffered cache are
* for its session's transaction to perform correctly.
*
* Another example: the QE readers need to know which sub-transactions are
* active or committed for a session's transaction so they can properly read
* sub-transaction data written by the QE writer for the transaction.
*
* So, the theme is to share private, not-yet-committed session transaction
* information with the QE Readers so the SegMate process group can all work
* on the transaction correctly. [We mostly think of QE Writers/Readers being
* on the segments. However, masters have special purpose QE Reader called the
* Entry DB Singleton. So, the SegMate module also works on the master.]
*
* Each shared snapshot is local only to the segment database. High level
* Writer gang member establishes a local transaction, acquires the slot in
* shared snapshot shmem space and copies the snapshot information into shared
* memory where the other qExecs that are segmates can find it. Following
* section convers details on how shared memory initialization happens, who
* writes the snapshot, how its controlled how/when the readers can read the
* snapshot, locking, etc..
*
* Shared Memory Initialization: Shared memory is setup by the postmaster. One
* slot for every user connection on the QD is kind of needed to store a data
* structure for a set of segmates to store their snapshot information. In
* each slot QE writer stores information defined by SharedSnapshotSlot.
*
* PQsendMppStatement: Is the same as PQsendQuery except that it also sends a
* serialized snapshot and xid. postgres.c has been modified to accept this
* new protocol message. It does pretty much the same stuff as it would for a
* 'Q' (normal query) except it unpacks the snapshot and xid from the QD and
* stores it away. All QEs get sent in a QD snapshot during statement
* dispatch.
*
* Global Session ID: The shared snapshot shared memory is split into slots. A
* set of segmates for a given user requires a single slot. The snapshot
* information and other information is stored within the snapshot. A unique
* session id identifies all the components in the system that are working for
* a single user session. Within a single segment database this essentially
* defines what it means to be "segmates." The shared snapshot slot is
* identified by this unique session id. The unique session id is sent in from
* the QD as a GUC called "mpp_session_id". So the slot's field "slotid" will
* store the "mpp_session_id" that WRITER to the slot will use. Readers of the
* slot will find the correct slot by finding the one that has the slotid
* equal to their own mpp_session_id.
*
* Single Writer: Mechanism is simplified by introducing the restriction of
* only having a single qExec in a set of segmates capable of writing. Single
* WRITER qExec is the only qExec amongst all of its segmates that will ever
* perform database write operations. Benefits of the approach, Single WRITER
* qExec is the only member of a set of segmates that need to participate in
* global transactions. Also... only this WRITER qExec really has to do
* anything during commit. Assumption seems since they are just reader qExecs
* that this is not a problem. The single WRITER qExec is the only qExec that
* is guaranteed to participate in every dispatched statement for a given user
* (at least to that segdb). Also, it is this WRITER qExec that performs any
* utility statement.
*
* Coordinating Readers and Writers: The coordination is on when the writer
* has set the snapshot such that the readers can get it and use it. In
* general, we cannot assume that the writer will get to setting it before the
* reader needs it and so we need to build a mechanism for the reader to (1)
* know that its reading the right snapshot and (2) know when it can read.
* The Mpp_session_id stored in the SharedSnapshotSlot is the piece of
* information that lets the reader know it has got the right slot. And it
* knows can read it when the xid and cid in the slot match the transactionid
* and curid sent in from the QD in the SnapshotInfo field. Basically QE
* READERS aren't allowed to read the shared local snapshot until the shared
* local snapshot has the same QD statement id as the QE Reader. i.e. the QE
* WRITER updates the local snapshot and then writes the QD statement id into
* the slot which identifies the "freshness" of that information. Currently QE
* readers check that value and if its not been set they sleep (gasp!) for a
* while. Think this approach is definitely not elegant and robust would be
* great maybe to replace it with latch based approach.
*
* Cursor handling through SharedSnapshot: Cursors are funny case because they
* read through a snapshot taken when the create cursor command was executed,
* not through the current snapshot. Originally, the SharedSnapshotSlot was
* designed for just the current command. The default transaction isolation
* mode is READ COMMITTED, which cause a new snapshot to be created each
* command. Each command in an explicit transaction started with BEGIN and
* completed with COMMIT, etc. So, cursors would read through the current
* snapshot instead of the create cursor snapshot and see data they shouldn't
* see. The problem turns out to be a little more subtle because of the
* existence of QE Readers and the fact that QE Readers can be created later –
* long after the create cursor command. So, the solution was to serialize the
* current snapshot to a temporary file during create cursor so that
* subsequently created QE Readers could get the right snapshot to use from
* the temporary file and ignore the SharedSnapshotSlot.
*
* Sub-Transaction handling through SharedSnapshot: QE Readers need to know
* which sub-transactions the QE Writer has committed and which are active so
* QE Readers can see the right data. While a sub-transaction may be committed
* in an active parent transaction, that data is not formally committed until
* the parent commits. And, active sub-transactions are not even
* sub-transaction committed yet. So, other transactions cannot see active or
* committed sub-transaction work yet. Without adding special logic to a QE
* Reader, it would be considered another transaction and not see the
* committed or active sub-transaction work. This is because QE Readers do not
* start their own transaction. We just set a few variables in the xact.c
* module to fake making it look like there is a current transaction,
* including which sub-transactions are active or committed. This is a
* kludge. In order for the QE Reader to fake being part of the QE Writer
* transaction, we put the current transaction id and the values of all active
* and committed sub-transaction ids into the SharedSnapshotSlot shared-memory
* structure. Since shared-memory is not dynamic, poses an arbitrary limit on
* the number of sub-transaction ids we keep in the SharedSnapshotSlot
* in-memory. Once this limit is exceeded the sub-transaction ids are written
* to temp files on disk. See how the TransactionIdIsCurrentTransactionId
* procedure in xact.c checks to see if the backend executing is a QE Reader
* (or Entry DB Singleton), and if it is, walk through the sub-transaction ids
* in SharedSnapshotSlot.
*
* -------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/distributedlog.h"
#include "access/twophase.h" /*max_prepared_xacts*/
#include "access/xact.h"
#include "cdb/cdbtm.h"
#include "cdb/cdbvars.h"
#include "funcapi.h"
#include "miscadmin.h"
#include "lib/stringinfo.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "utils/builtins.h"
#include "utils/faultinjector.h"
#include "utils/guc.h"
#include "utils/memutils.h"
#include "utils/resowner.h"
#include "utils/sharedsnapshot.h"
#include "utils/snapmgr.h"
/*
* Distributed Snapshot that gets sent in from the QD to processes running
* in EXECUTE mode.
*/
DtxContext DistributedTransactionContext = DTX_CONTEXT_LOCAL_ONLY;
DtxContextInfo QEDtxContextInfo = DtxContextInfo_StaticInit;
#define DUMP_HASH_SZ 1024
typedef struct DumpEntry
{
uint32 segmate;
FullTransactionId localXid;
Snapshot snapshot;
} DumpEntry;
/* local hash table to store cursor snapshot dump*/
static HTAB *dumpHtab = NULL;
static bool created_dump = false;
static ResourceOwner DumpResOwner = NULL; /* shared snapshot dump resources */
/* MPP Shared Snapshot. */
typedef struct SharedSnapshotStruct
{
int numSlots; /* number of valid Snapshot entries */
int maxSlots; /* allocated size of sharedSnapshotArray */
int nextSlot; /* points to the next avail slot. */
/*
* We now allow direct indexing into this array.
*
* We allocate the XIPS below.
*
* Be very careful when accessing fields inside here.
*/
SharedSnapshotSlot *slots;
TransactionId *xips; /* VARIABLE LENGTH ARRAY */
} SharedSnapshotStruct;
static volatile SharedSnapshotStruct *sharedSnapshotArray;
volatile SharedSnapshotSlot *SharedLocalSnapshotSlot = NULL;
static Size slotSize = 0;
static Size slotCount = 0;
static Size xipEntryCount = 0;
/* prototypes for internal functions */
static SharedSnapshotSlot *SharedSnapshotAdd(int32 slotId);
static SharedSnapshotSlot *SharedSnapshotLookup(int32 slotId);
/*
* Report shared-memory space needed by CreateSharedSnapshot.
*/
Size
SharedSnapshotShmemSize(void)
{
Size size;
/* should be the same as PROCARRAY_MAXPROCS */
xipEntryCount = MaxBackends + max_prepared_xacts;
slotSize = sizeof(SharedSnapshotSlot);
slotSize += mul_size(sizeof(TransactionId), (xipEntryCount));
slotSize = MAXALIGN(slotSize);
/*
* We only really need MaxBackends; but for safety we multiply that by two
* (to account for slow de-allocation on cleanup, for instance).
*
* MaxBackends is only somewhat right. What we really want here is the
* MaxBackends value from the QD. But this is at least safe since we know
* we dont need *MORE* than MaxBackends. But in general MaxBackends on a
* QE is going to be bigger than on a QE by a good bit. or at least it
* should be.
*/
slotCount = NUM_SHARED_SNAPSHOT_SLOTS;
size = offsetof(SharedSnapshotStruct, xips);
size = add_size(size, mul_size(slotSize, slotCount));
RequestNamedLWLockTranche("SharedSnapshotLocks", slotCount);
return MAXALIGN(size);
}
/*
* Initialize the sharedSnapshot array. This array is used to communicate
* snapshots between qExecs that are segmates.
*/
void
CreateSharedSnapshotArray(void)
{
bool found;
int i;
TransactionId *xip_base=NULL;
/* Create or attach to the SharedSnapshot shared structure */
sharedSnapshotArray = (SharedSnapshotStruct *)
ShmemInitStruct("Shared Snapshot", SharedSnapshotShmemSize(), &found);
Assert(slotCount != 0);
Assert(xipEntryCount != 0);
if (!found)
{
/*
* We're the first - initialize.
*/
LWLockPadded *lock_base;
sharedSnapshotArray->numSlots = 0;
/* slotCount is initialized in SharedSnapshotShmemSize(). */
sharedSnapshotArray->maxSlots = slotCount;
sharedSnapshotArray->nextSlot = 0;
/*
* Set slots to point to the next byte beyond what was allocated for
* SharedSnapshotStruct. xips is the last element in the struct but is
* not included in SharedSnapshotShmemSize allocation.
*/
sharedSnapshotArray->slots = (SharedSnapshotSlot *)&sharedSnapshotArray->xips;
/* xips start just after the last slot structure */
xip_base = (TransactionId *)&sharedSnapshotArray->slots[sharedSnapshotArray->maxSlots];
lock_base = GetNamedLWLockTranche("SharedSnapshotLocks");
for (i=0; i < sharedSnapshotArray->maxSlots; i++)
{
SharedSnapshotSlot *tmpSlot = &sharedSnapshotArray->slots[i];
tmpSlot->slotid = -1;
tmpSlot->slotindex = i;
tmpSlot->slotLock = &lock_base[i].lock;
MemSet(tmpSlot->dump, 0, sizeof(SnapshotDump) * SNAPSHOTDUMPARRAYSZ);
tmpSlot->cur_dump_id = 0;
/*
* Fixup xip array pointer reference space allocated after slot structs:
*
* Note: xipEntryCount is initialized in SharedSnapshotShmemSize().
* So each slot gets (MaxBackends + max_prepared_xacts) transaction-ids.
*/
tmpSlot->snapshot.xip = &xip_base[0];
xip_base += xipEntryCount;
}
}
}
/*
* Used to dump the internal state of the shared slots for debugging.
*/
char *
SharedSnapshotDump(void)
{
StringInfoData str;
volatile SharedSnapshotStruct *arrayP = sharedSnapshotArray;
int index;
Assert(LWLockHeldByMe(SharedSnapshotLock));
initStringInfo(&str);
appendStringInfo(&str, "Local SharedSnapshot Slot Dump: currSlots: %d maxSlots: %d ",
arrayP->numSlots, arrayP->maxSlots);
for (index=0; index < arrayP->maxSlots; index++)
{
/* need to do byte addressing to find the right slot */
SharedSnapshotSlot *testSlot = &arrayP->slots[index];
if (testSlot->slotid != -1)
{
appendStringInfo(&str, "(SLOT index: %d slotid: %d QDxid: "UINT64_FORMAT" pid: %u)",
testSlot->slotindex, testSlot->slotid, testSlot->distributedXid,
testSlot->writer_proc ? testSlot->writer_proc->pid : 0);
}
}
return str.data;
}
/* Acquires an available slot in the sharedSnapshotArray. The slot is then
* marked with the supplied slotId. This slotId is what others will use to
* find this slot. This should only ever be called by the "writer" qExec.
*
* The slotId should be something that is unique amongst all the possible
* "writer" qExecs active on a segment database at a given moment. It also
* will need to be communicated to the "reader" qExecs so that they can find
* this slot.
*/
static SharedSnapshotSlot *
SharedSnapshotAdd(int32 slotId)
{
SharedSnapshotSlot *slot;
volatile SharedSnapshotStruct *arrayP = sharedSnapshotArray;
int nextSlot, i;
int retryCount = gp_snapshotadd_timeout * 10; /* .1 s per wait */
retry:
LWLockAcquire(SharedSnapshotLock, LW_EXCLUSIVE);
slot = NULL;
for (i=0; i < arrayP->maxSlots; i++)
{
SharedSnapshotSlot *testSlot = &arrayP->slots[i];
if (testSlot->slotindex > arrayP->maxSlots)
{
LWLockRelease(SharedSnapshotLock);
elog(ERROR, "Shared Local Snapshots Array appears corrupted: %s", SharedSnapshotDump());
}
if (testSlot->slotid == slotId)
{
slot = testSlot;
break;
}
}
if (slot != NULL)
{
elog(DEBUG1, "SharedSnapshotAdd: found existing entry for our session-id. id %d retry %d pid %u", slotId, retryCount,
slot->writer_proc ? slot->writer_proc->pid : 0);
if (retryCount > 0)
{
LWLockRelease(SharedSnapshotLock);
retryCount--;
pg_usleep(100000); /* 100ms, wait gp_snapshotadd_timeout seconds max. */
goto retry;
}
else
{
char *slot_dump = SharedSnapshotDump();
LWLockRelease(SharedSnapshotLock);
elog(ERROR, "writer segworker group shared snapshot collision on id %d. Slot array dump: %s",
slotId, slot_dump);
}
}
if (arrayP->numSlots >= arrayP->maxSlots || arrayP->nextSlot == -1)
{
/*
* Ooops, no room. this shouldn't happen as something else should have
* complained if we go over MaxBackends.
*/
LWLockRelease(SharedSnapshotLock);
ereport(FATAL,
(errcode(ERRCODE_TOO_MANY_CONNECTIONS),
errmsg("sorry, too many clients already."),
errdetail("There are no more available slots in the sharedSnapshotArray."),
errhint("Another piece of code should have detected that we have too many clients."
" this probably means that someone isn't releasing their slot properly.")));
}
slot = &arrayP->slots[arrayP->nextSlot];
slot->slotindex = arrayP->nextSlot;
/*
* find the next available slot
*/
nextSlot = -1;
for (i=arrayP->nextSlot+1; i < arrayP->maxSlots; i++)
{
SharedSnapshotSlot *tmpSlot = &arrayP->slots[i];
if (tmpSlot->slotid == -1)
{
nextSlot = i;
break;
}
}
arrayP->nextSlot = nextSlot;
arrayP->numSlots++;
/* initialize some things */
slot->slotid = slotId;
slot->fullXid = InvalidFullTransactionId;
slot->startTimestamp = 0;
slot->distributedXid = InvalidDistributedTransactionId;
slot->segmateSync = 0;
/* Remember the writer proc for IsCurrentTransactionIdForReader */
slot->writer_proc = MyProc;
slot->writer_xact = MyPgXact;
LWLockRelease(SharedSnapshotLock);
return slot;
}
void
GetSlotTableDebugInfo(void **snapshotArray, int *maxSlots)
{
*snapshotArray = (void *)sharedSnapshotArray;
*maxSlots = sharedSnapshotArray->maxSlots;
}
/*
* Used by "reader" qExecs to find the slot in the sharedsnapshotArray with the
* specified slotId. In general, we should always be able to find the specified
* slot unless something unexpected. If the slot is not found, then NULL is
* returned.
*
* MPP-4599: retry in the same pattern as the writer.
*/
static SharedSnapshotSlot *
SharedSnapshotLookup(int32 slotId)
{
SharedSnapshotSlot *slot = NULL;
volatile SharedSnapshotStruct *arrayP = sharedSnapshotArray;
int retryCount = gp_snapshotadd_timeout * 10; /* .1 s per wait */
int index;
for (;;)
{
CHECK_FOR_INTERRUPTS();
LWLockAcquire(SharedSnapshotLock, LW_SHARED);
for (index=0; index < arrayP->maxSlots; index++)
{
SharedSnapshotSlot *testSlot;
testSlot = &arrayP->slots[index];
if (testSlot->slotindex > arrayP->maxSlots)
elog(ERROR, "Shared Local Snapshots Array appears corrupted: %s", SharedSnapshotDump());
if (testSlot->slotid == slotId)
{
slot = testSlot;
break;
}
}
LWLockRelease(SharedSnapshotLock);
if (slot != NULL)
{
break;
}
else
{
if (retryCount > 0)
{
retryCount--;
pg_usleep(100000); /* 100ms, wait gp_snapshotadd_timeout seconds max. */
}
else
{
break;
}
}
}
return slot;
}
/*
* Used by the "writer" qExec to "release" the slot it had been using.
*
*/
void
SharedSnapshotRemove(volatile SharedSnapshotSlot *slot, char *creatorDescription)
{
int slotId = slot->slotid;
LWLockAcquire(SharedSnapshotLock, LW_EXCLUSIVE);
/* determine if we need to modify the next available slot to use. we
* only do this is our slotindex is lower then the existing one.
*/
if (sharedSnapshotArray->nextSlot == -1 || slot->slotindex < sharedSnapshotArray->nextSlot)
{
if (slot->slotindex > sharedSnapshotArray->maxSlots)
elog(ERROR, "Shared Local Snapshots slot has a bogus slotindex: %d. slot array dump: %s",
slot->slotindex, SharedSnapshotDump());
sharedSnapshotArray->nextSlot = slot->slotindex;
}
/* reset the slotid which marks it as being unused. */
slot->slotid = -1;
slot->fullXid = InvalidFullTransactionId;
slot->startTimestamp = 0;
slot->distributedXid = InvalidDistributedTransactionId;
slot->segmateSync = 0;
sharedSnapshotArray->numSlots -= 1;
LWLockRelease(SharedSnapshotLock);
elog((Debug_print_full_dtm ? LOG : DEBUG5),"SharedSnapshotRemove removed slot for slotId = %d, creator = %s (address %p)",
slotId, creatorDescription, SharedLocalSnapshotSlot);
}
void
addSharedSnapshot(char *creatorDescription, int id)
{
SharedLocalSnapshotSlot = SharedSnapshotAdd(id);
elog((Debug_print_full_dtm ? LOG : DEBUG5),"%s added Shared Local Snapshot slot for gp_session_id = %d (address %p)",
creatorDescription, id, SharedLocalSnapshotSlot);
}
void
lookupSharedSnapshot(char *lookerDescription, char *creatorDescription, int id)
{
SharedSnapshotSlot *slot;
slot = SharedSnapshotLookup(id);
if (slot == NULL)
{
LWLockAcquire(SharedSnapshotLock, LW_SHARED);
ereport(ERROR,
(errmsg("%s could not find Shared Local Snapshot!",
lookerDescription),
errdetail("Tried to find a shared snapshot slot with id: %d "
"and found none. Shared Local Snapshots dump: %s", id,
SharedSnapshotDump()),
errhint("Either this %s was created before the %s or the %s died.",
lookerDescription, creatorDescription, creatorDescription)));
}
SharedLocalSnapshotSlot = slot;
elog((Debug_print_full_dtm ? LOG : DEBUG5),"%s found Shared Local Snapshot slot for gp_session_id = %d created by %s (address %p)",
lookerDescription, id, creatorDescription, SharedLocalSnapshotSlot);
}
/*
* Dump the shared local snapshot, so that the readers can pick it up.
*/
void
dumpSharedLocalSnapshot_forCursor(void)
{
ResourceOwner oldowner;
SharedSnapshotSlot *src = NULL;
Assert(Gp_role == GP_ROLE_DISPATCH || (Gp_role == GP_ROLE_EXECUTE && Gp_is_writer));
Assert(SharedLocalSnapshotSlot != NULL);
if (DumpResOwner== NULL)
DumpResOwner = ResourceOwnerCreate(NULL, "SharedSnapshotDumpResOwner");
LWLockAcquire(SharedLocalSnapshotSlot->slotLock, LW_EXCLUSIVE);
created_dump = true;
oldowner = CurrentResourceOwner;
CurrentResourceOwner = DumpResOwner;
src = (SharedSnapshotSlot *)SharedLocalSnapshotSlot;
Size sz ;
dsm_segment *segment;
int id = src->cur_dump_id;
volatile SnapshotDump *pDump = &src->dump[id];
if(pDump->segment)
dsm_detach(pDump->segment);
sz = EstimateSnapshotSpace(&src->snapshot);
segment = dsm_create(sz, 0);
char *ptr = dsm_segment_address(segment);
SerializeSnapshot(&src->snapshot, ptr);
pDump->segment = segment;
pDump->handle = dsm_segment_handle(segment);
pDump->segmateSync = src->segmateSync;
pDump->distributedXid = src->distributedXid;
pDump->localXid = src->fullXid;
elog(LOG, "Dump syncmate : %u snapshot to slot %d", src->segmateSync, id);
src->cur_dump_id =
(src->cur_dump_id + 1) % SNAPSHOTDUMPARRAYSZ;
CurrentResourceOwner = oldowner;
LWLockRelease(SharedLocalSnapshotSlot->slotLock);
}
void
readSharedLocalSnapshot_forCursor(Snapshot snapshot, DtxContext distributedTransactionContext)
{
bool found;
SharedSnapshotSlot *src = NULL;
FullTransactionId localXid;
Assert(Gp_role == GP_ROLE_EXECUTE);
Assert(!Gp_is_writer);
Assert(SharedLocalSnapshotSlot != NULL);
Assert(snapshot->xip != NULL);
SIMPLE_FAULT_INJECTOR("before_read_shared_snapshot_for_cursor");
if (dumpHtab == NULL)
{
HASHCTL hash_ctl;
memset(&hash_ctl, 0, sizeof(hash_ctl));
hash_ctl.keysize = sizeof(uint32);
hash_ctl.entrysize = sizeof(DumpEntry);
hash_ctl.hcxt = TopTransactionContext;
dumpHtab = hash_create("snapshot dump",
DUMP_HASH_SZ ,
&hash_ctl,
HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
}
/* check segmate in local memory, only sync from shared memory once */
DumpEntry *entry = hash_search(dumpHtab, &QEDtxContextInfo.segmateSync, HASH_ENTER, &found);
volatile SnapshotDump *pDump = NULL;
if (!found)
{
src = (SharedSnapshotSlot *)SharedLocalSnapshotSlot;
LWLockAcquire(src->slotLock, LW_SHARED);
int search_finish_id = src->cur_dump_id;
int search_iter = search_finish_id;
do{
if (search_iter < 0)
search_iter = SNAPSHOTDUMPARRAYSZ - 1;
if (src->dump[search_iter].segmateSync == QEDtxContextInfo.segmateSync &&
src->dump[search_iter].distributedXid == QEDtxContextInfo.distributedXid)
{
pDump = &src->dump[search_iter];
found = true;
break;
}
search_iter --;
} while (search_iter != search_finish_id);
if (!found)
{
LWLockRelease(SharedLocalSnapshotSlot->slotLock);
LWLockAcquire(SharedSnapshotLock, LW_SHARED); /* For SharedSnapshotDump() */
ereport(ERROR, (errmsg("could not find Shared Local Snapshot!"),
errdetail("Tried to set the shared local snapshot slot with segmate: %u "
"and failed. Shared Local Snapshots dump: %s", QEDtxContextInfo.segmateSync,
SharedSnapshotDump())));
}
Assert(pDump->handle != 0);
dsm_segment* segment = dsm_attach(pDump->handle);
char *ptr = dsm_segment_address(segment);
entry->snapshot = RestoreSnapshot(ptr);
entry->localXid = pDump->localXid;
dsm_detach(segment);
LWLockRelease(SharedLocalSnapshotSlot->slotLock);
}
Snapshot dumpsnapshot = entry->snapshot;
localXid = entry->localXid;
snapshot->xmin = dumpsnapshot->xmin;
snapshot->xmax = dumpsnapshot->xmax;
snapshot->xcnt = dumpsnapshot->xcnt;
memcpy(snapshot->xip, dumpsnapshot->xip, snapshot->xcnt * sizeof(TransactionId));
/* zero out the slack in the xip-array */
memset(snapshot->xip + snapshot->xcnt, 0, (xipEntryCount - snapshot->xcnt)*sizeof(TransactionId));
snapshot->curcid = dumpsnapshot->curcid;
SetSharedTransactionId_reader(
localXid,
snapshot->curcid,
distributedTransactionContext);
}
/*
* Free any shared snapshot files.
*/
void
AtEOXact_SharedSnapshot(void)
{
dumpHtab = NULL;
if (created_dump)
{
LWLockAcquire(SharedLocalSnapshotSlot->slotLock, LW_EXCLUSIVE);
/* release dump dsm */
ResourceOwnerRelease(DumpResOwner,
RESOURCE_RELEASE_BEFORE_LOCKS,
false, /* isCommit */
true); /* isTopLevel */
SharedLocalSnapshotSlot->cur_dump_id = 0;
MemSet(SharedLocalSnapshotSlot->dump, 0, sizeof(SnapshotDump) * SNAPSHOTDUMPARRAYSZ);
created_dump = false;
LWLockRelease(SharedLocalSnapshotSlot->slotLock);
}
}
/*
* LogDistributedSnapshotInfo
* Log the distributed snapshot info in a given snapshot.
*
* The 'prefix' is used to prefix the log message.
*/
void
LogDistributedSnapshotInfo(Snapshot snapshot, const char *prefix)
{
if (!IsMVCCSnapshot(snapshot))
return;
StringInfoData buf;
initStringInfo(&buf);
DistributedSnapshotWithLocalMapping *mapping =
&(snapshot->distribSnapshotWithLocalMapping);
DistributedSnapshot *ds = &mapping->ds;
appendStringInfo(&buf, "%s Distributed snapshot info: "
"xminAllDistributedSnapshots="UINT64_FORMAT", distribSnapshotId=%d"
", xmin="UINT64_FORMAT", xmax="UINT64_FORMAT", count=%d",
prefix,
ds->xminAllDistributedSnapshots,
ds->distribSnapshotId,
ds->xmin,
ds->xmax,
ds->count);
appendStringInfoString(&buf, ", In progress array: {");
for (int no = 0; no < ds->count; no++)
{
if (no != 0)
{
appendStringInfo(&buf, ", (dx"UINT64_FORMAT")",
ds->inProgressXidArray[no]);
}
else
{
appendStringInfo(&buf, " (dx"UINT64_FORMAT")",
ds->inProgressXidArray[no]);
}
}
appendStringInfoString(&buf, "}");
elog(LOG, "%s", buf.data);
pfree(buf.data);
}
相关信息
相关文章
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦