greenplumn sharedsnapshot 源码

  • 2022-08-18
  • 浏览 (222)

greenplumn sharedsnapshot 代码

文件路径:/src/backend/utils/time/sharedsnapshot.c

/*-------------------------------------------------------------------------
 *
 * sharedsnapshot.c
 *	  GPDB shared snapshot management.
 *
 * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
 * Portions Copyright (c) 1994, Regents of the University of California
 *
 * IDENTIFICATION
 *	  src/backend/utils/time/sharedsnapshot.c
 *
 * In Greenplum, as part of slice plans, many postgres processes (qExecs, QE)
 * running on a single segment database as part of the same user's SQL
 * statement. All of the qExecs that belong to a particular user on a
 * particular segment database need to have consistent visibility. Idea used
 * is called "Shared Local Snapshot". Shared-memory data structure
 * SharedSnapshotSlot shares session and transaction information among
 * session's gang processes on a particular database instance. The processes
 * are called a SegMate process group.
 *
 * A SegMate process group is a QE (Query Executor) Writer process and 0, 1 or
 * more QE Reader processes. Greenplum needed to invent a SegMate sharing
 * mechanism because in Postgres there is only 1 backend and most needed
 * information is simply available in private memory. With Greenplum session
 * parallelism on database instances, we need to have a way to share
 * not-yet-committed session information among the SegMates. This information
 * includes transaction snapshots, sub-transaction status, so-called combo-cid
 * mapping, etc.
 *
 * An example: the QE readers need to use the same snapshot and command number
 * information as the QE writer so they see the current data written by the QE
 * writer. During a transaction, the QE Writer writes new data into the
 * shared-memory buffered cache. Later in that same transaction, QE Readers
 * will need to recognize which tuples in the shared-memory buffered cache are
 * for its session's transaction to perform correctly.
 *
 * Another example: the QE readers need to know which sub-transactions are
 * active or committed for a session's transaction so they can properly read
 * sub-transaction data written by the QE writer for the transaction.
 *
 * So, the theme is to share private, not-yet-committed session transaction
 * information with the QE Readers so the SegMate process group can all work
 * on the transaction correctly. [We mostly think of QE Writers/Readers being
 * on the segments. However, masters have special purpose QE Reader called the
 * Entry DB Singleton. So, the SegMate module also works on the master.]
 *
 * Each shared snapshot is local only to the segment database. High level
 * Writer gang member establishes a local transaction, acquires the slot in
 * shared snapshot shmem space and copies the snapshot information into shared
 * memory where the other qExecs that are segmates can find it. Following
 * section convers details on how shared memory initialization happens, who
 * writes the snapshot, how its controlled how/when the readers can read the
 * snapshot, locking, etc..
 *
 * Shared Memory Initialization: Shared memory is setup by the postmaster. One
 * slot for every user connection on the QD is kind of needed to store a data
 * structure for a set of segmates to store their snapshot information. In
 * each slot QE writer stores information defined by SharedSnapshotSlot.
 *
 * PQsendMppStatement: Is the same as PQsendQuery except that it also sends a
 * serialized snapshot and xid. postgres.c has been modified to accept this
 * new protocol message. It does pretty much the same stuff as it would for a
 * 'Q' (normal query) except it unpacks the snapshot and xid from the QD and
 * stores it away. All QEs get sent in a QD snapshot during statement
 * dispatch.
 *
 * Global Session ID: The shared snapshot shared memory is split into slots. A
 * set of segmates for a given user requires a single slot. The snapshot
 * information and other information is stored within the snapshot. A unique
 * session id identifies all the components in the system that are working for
 * a single user session. Within a single segment database this essentially
 * defines what it means to be "segmates."  The shared snapshot slot is
 * identified by this unique session id. The unique session id is sent in from
 * the QD as a GUC called "mpp_session_id". So the slot's field "slotid" will
 * store the "mpp_session_id" that WRITER to the slot will use. Readers of the
 * slot will find the correct slot by finding the one that has the slotid
 * equal to their own mpp_session_id.
 *
 * Single Writer: Mechanism is simplified by introducing the restriction of
 * only having a single qExec in a set of segmates capable of writing. Single
 * WRITER qExec is the only qExec amongst all of its segmates that will ever
 * perform database write operations.  Benefits of the approach, Single WRITER
 * qExec is the only member of a set of segmates that need to participate in
 * global transactions. Also... only this WRITER qExec really has to do
 * anything during commit. Assumption seems since they are just reader qExecs
 * that this is not a problem. The single WRITER qExec is the only qExec that
 * is guaranteed to participate in every dispatched statement for a given user
 * (at least to that segdb). Also, it is this WRITER qExec that performs any
 * utility statement.
 *
 * Coordinating Readers and Writers: The coordination is on when the writer
 * has set the snapshot such that the readers can get it and use it. In
 * general, we cannot assume that the writer will get to setting it before the
 * reader needs it and so we need to build a mechanism for the reader to (1)
 * know that its reading the right snapshot and (2) know when it can read.
 * The Mpp_session_id stored in the SharedSnapshotSlot is the piece of
 * information that lets the reader know it has got the right slot. And it
 * knows can read it when the xid and cid in the slot match the transactionid
 * and curid sent in from the QD in the SnapshotInfo field.  Basically QE
 * READERS aren't allowed to read the shared local snapshot until the shared
 * local snapshot has the same QD statement id as the QE Reader. i.e. the QE
 * WRITER updates the local snapshot and then writes the QD statement id into
 * the slot which identifies the "freshness" of that information. Currently QE
 * readers check that value and if its not been set they sleep (gasp!) for a
 * while. Think this approach is definitely not elegant and robust would be
 * great maybe to replace it with latch based approach.
 *
 * Cursor handling through SharedSnapshot: Cursors are funny case because they
 * read through a snapshot taken when the create cursor command was executed,
 * not through the current snapshot. Originally, the SharedSnapshotSlot was
 * designed for just the current command. The default transaction isolation
 * mode is READ COMMITTED, which cause a new snapshot to be created each
 * command. Each command in an explicit transaction started with BEGIN and
 * completed with COMMIT, etc. So, cursors would read through the current
 * snapshot instead of the create cursor snapshot and see data they shouldn't
 * see. The problem turns out to be a little more subtle because of the
 * existence of QE Readers and the fact that QE Readers can be created later –
 * long after the create cursor command. So, the solution was to serialize the
 * current snapshot to a temporary file during create cursor so that
 * subsequently created QE Readers could get the right snapshot to use from
 * the temporary file and ignore the SharedSnapshotSlot.
 *
 * Sub-Transaction handling through SharedSnapshot: QE Readers need to know
 * which sub-transactions the QE Writer has committed and which are active so
 * QE Readers can see the right data. While a sub-transaction may be committed
 * in an active parent transaction, that data is not formally committed until
 * the parent commits. And, active sub-transactions are not even
 * sub-transaction committed yet. So, other transactions cannot see active or
 * committed sub-transaction work yet. Without adding special logic to a QE
 * Reader, it would be considered another transaction and not see the
 * committed or active sub-transaction work. This is because QE Readers do not
 * start their own transaction. We just set a few variables in the xact.c
 * module to fake making it look like there is a current transaction,
 * including which sub-transactions are active or committed. This is a
 * kludge. In order for the QE Reader to fake being part of the QE Writer
 * transaction, we put the current transaction id and the values of all active
 * and committed sub-transaction ids into the SharedSnapshotSlot shared-memory
 * structure. Since shared-memory is not dynamic, poses an arbitrary limit on
 * the number of sub-transaction ids we keep in the SharedSnapshotSlot
 * in-memory. Once this limit is exceeded the sub-transaction ids are written
 * to temp files on disk.  See how the TransactionIdIsCurrentTransactionId
 * procedure in xact.c checks to see if the backend executing is a QE Reader
 * (or Entry DB Singleton), and if it is, walk through the sub-transaction ids
 * in SharedSnapshotSlot.
 *
 * -------------------------------------------------------------------------
 */

#include "postgres.h"

#include "access/distributedlog.h"
#include "access/twophase.h"  /*max_prepared_xacts*/
#include "access/xact.h"
#include "cdb/cdbtm.h"
#include "cdb/cdbvars.h"
#include "funcapi.h"
#include "miscadmin.h"
#include "lib/stringinfo.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "utils/builtins.h"
#include "utils/faultinjector.h"
#include "utils/guc.h"
#include "utils/memutils.h"
#include "utils/resowner.h"
#include "utils/sharedsnapshot.h"
#include "utils/snapmgr.h"

/*
 * Distributed Snapshot that gets sent in from the QD to processes running
 * in EXECUTE mode.
 */
DtxContext DistributedTransactionContext = DTX_CONTEXT_LOCAL_ONLY;

DtxContextInfo QEDtxContextInfo = DtxContextInfo_StaticInit;

#define DUMP_HASH_SZ    1024
typedef struct DumpEntry
{
	uint32  segmate;
	FullTransactionId localXid;
	Snapshot snapshot;
} DumpEntry;

/* local hash table to store cursor snapshot dump*/
static HTAB *dumpHtab = NULL;
static bool created_dump = false;

static ResourceOwner DumpResOwner = NULL;	/* shared snapshot dump resources */

/* MPP Shared Snapshot. */
typedef struct SharedSnapshotStruct
{
	int 		numSlots;		/* number of valid Snapshot entries */
	int			maxSlots;		/* allocated size of sharedSnapshotArray */
	int 		nextSlot;		/* points to the next avail slot. */

	/*
	 * We now allow direct indexing into this array.
	 *
	 * We allocate the XIPS below.
	 *
	 * Be very careful when accessing fields inside here.
	 */
	SharedSnapshotSlot	   *slots;

	TransactionId	   *xips;		/* VARIABLE LENGTH ARRAY */
} SharedSnapshotStruct;

static volatile SharedSnapshotStruct *sharedSnapshotArray;

volatile SharedSnapshotSlot *SharedLocalSnapshotSlot = NULL;

static Size slotSize = 0;
static Size slotCount = 0;
static Size xipEntryCount = 0;

/* prototypes for internal functions */
static SharedSnapshotSlot *SharedSnapshotAdd(int32 slotId);
static SharedSnapshotSlot *SharedSnapshotLookup(int32 slotId);

/*
 * Report shared-memory space needed by CreateSharedSnapshot.
 */
Size
SharedSnapshotShmemSize(void)
{
	Size		size;

	/* should be the same as PROCARRAY_MAXPROCS */
	xipEntryCount = MaxBackends + max_prepared_xacts;

	slotSize = sizeof(SharedSnapshotSlot);
	slotSize += mul_size(sizeof(TransactionId), (xipEntryCount));
	slotSize = MAXALIGN(slotSize);

	/*
	 * We only really need MaxBackends; but for safety we multiply that by two
	 * (to account for slow de-allocation on cleanup, for instance).
	 *
	 * MaxBackends is only somewhat right.  What we really want here is the
	 * MaxBackends value from the QD.  But this is at least safe since we know
	 * we dont need *MORE* than MaxBackends.  But in general MaxBackends on a
	 * QE is going to be bigger than on a QE by a good bit.  or at least it
	 * should be.
	 */

	slotCount = NUM_SHARED_SNAPSHOT_SLOTS;

	size = offsetof(SharedSnapshotStruct, xips);
	size = add_size(size, mul_size(slotSize, slotCount));

	RequestNamedLWLockTranche("SharedSnapshotLocks", slotCount);

	return MAXALIGN(size);
}

/*
 * Initialize the sharedSnapshot array.  This array is used to communicate
 * snapshots between qExecs that are segmates.
 */
void
CreateSharedSnapshotArray(void)
{
	bool	found;
	int		i;
	TransactionId *xip_base=NULL;

	/* Create or attach to the SharedSnapshot shared structure */
	sharedSnapshotArray = (SharedSnapshotStruct *)
		ShmemInitStruct("Shared Snapshot", SharedSnapshotShmemSize(), &found);

	Assert(slotCount != 0);
	Assert(xipEntryCount != 0);

	if (!found)
	{
		/*
		 * We're the first - initialize.
		 */
		LWLockPadded *lock_base;

		sharedSnapshotArray->numSlots = 0;

		/* slotCount is initialized in SharedSnapshotShmemSize(). */
		sharedSnapshotArray->maxSlots = slotCount;
		sharedSnapshotArray->nextSlot = 0;

		/*
		 * Set slots to point to the next byte beyond what was allocated for
		 * SharedSnapshotStruct. xips is the last element in the struct but is
		 * not included in SharedSnapshotShmemSize allocation.
		 */
		sharedSnapshotArray->slots = (SharedSnapshotSlot *)&sharedSnapshotArray->xips;

		/* xips start just after the last slot structure */
		xip_base = (TransactionId *)&sharedSnapshotArray->slots[sharedSnapshotArray->maxSlots];

		lock_base = GetNamedLWLockTranche("SharedSnapshotLocks");
		for (i=0; i < sharedSnapshotArray->maxSlots; i++)
		{
			SharedSnapshotSlot *tmpSlot = &sharedSnapshotArray->slots[i];

			tmpSlot->slotid = -1;
			tmpSlot->slotindex = i;
			tmpSlot->slotLock = &lock_base[i].lock;

			MemSet(tmpSlot->dump, 0, sizeof(SnapshotDump) * SNAPSHOTDUMPARRAYSZ);
			tmpSlot->cur_dump_id = 0;
			/*
			 * Fixup xip array pointer reference space allocated after slot structs:
			 *
			 * Note: xipEntryCount is initialized in SharedSnapshotShmemSize().
			 * So each slot gets (MaxBackends + max_prepared_xacts) transaction-ids.
			 */
			tmpSlot->snapshot.xip = &xip_base[0];
			xip_base += xipEntryCount;
		}
	}
}

/*
 * Used to dump the internal state of the shared slots for debugging.
 */
char *
SharedSnapshotDump(void)
{
	StringInfoData str;
	volatile SharedSnapshotStruct *arrayP = sharedSnapshotArray;
	int			index;

	Assert(LWLockHeldByMe(SharedSnapshotLock));

	initStringInfo(&str);

	appendStringInfo(&str, "Local SharedSnapshot Slot Dump: currSlots: %d maxSlots: %d ",
					 arrayP->numSlots, arrayP->maxSlots);

	for (index=0; index < arrayP->maxSlots; index++)
	{
		/* need to do byte addressing to find the right slot */
		SharedSnapshotSlot *testSlot = &arrayP->slots[index];

		if (testSlot->slotid != -1)
		{
			appendStringInfo(&str, "(SLOT index: %d slotid: %d QDxid: "UINT64_FORMAT" pid: %u)",
							 testSlot->slotindex, testSlot->slotid, testSlot->distributedXid,
							 testSlot->writer_proc ? testSlot->writer_proc->pid : 0);
		}

	}

	return str.data;
}

/* Acquires an available slot in the sharedSnapshotArray.  The slot is then
 * marked with the supplied slotId.  This slotId is what others will use to
 * find this slot.  This should only ever be called by the "writer" qExec.
 *
 * The slotId should be something that is unique amongst all the possible
 * "writer" qExecs active on a segment database at a given moment.  It also
 * will need to be communicated to the "reader" qExecs so that they can find
 * this slot.
 */
static SharedSnapshotSlot *
SharedSnapshotAdd(int32 slotId)
{
	SharedSnapshotSlot *slot;
	volatile SharedSnapshotStruct *arrayP = sharedSnapshotArray;
	int nextSlot, i;
	int retryCount = gp_snapshotadd_timeout * 10; /* .1 s per wait */

retry:
	LWLockAcquire(SharedSnapshotLock, LW_EXCLUSIVE);

	slot = NULL;

	for (i=0; i < arrayP->maxSlots; i++)
	{
		SharedSnapshotSlot *testSlot = &arrayP->slots[i];

		if (testSlot->slotindex > arrayP->maxSlots)
		{
			LWLockRelease(SharedSnapshotLock);
			elog(ERROR, "Shared Local Snapshots Array appears corrupted: %s", SharedSnapshotDump());
		}

		if (testSlot->slotid == slotId)
		{
			slot = testSlot;
			break;
		}
	}

	if (slot != NULL)
	{
		elog(DEBUG1, "SharedSnapshotAdd: found existing entry for our session-id. id %d retry %d pid %u", slotId, retryCount,
				slot->writer_proc ? slot->writer_proc->pid : 0);

		if (retryCount > 0)
		{
			LWLockRelease(SharedSnapshotLock);
			retryCount--;

			pg_usleep(100000); /* 100ms, wait gp_snapshotadd_timeout seconds max. */
			goto retry;
		}
		else
		{
			char *slot_dump = SharedSnapshotDump();
			LWLockRelease(SharedSnapshotLock);
			elog(ERROR, "writer segworker group shared snapshot collision on id %d. Slot array dump: %s",
				 slotId, slot_dump);
		}
	}

	if (arrayP->numSlots >= arrayP->maxSlots || arrayP->nextSlot == -1)
	{
		/*
		 * Ooops, no room.  this shouldn't happen as something else should have
		 * complained if we go over MaxBackends.
		 */
		LWLockRelease(SharedSnapshotLock);
		ereport(FATAL,
				(errcode(ERRCODE_TOO_MANY_CONNECTIONS),
				 errmsg("sorry, too many clients already."),
				 errdetail("There are no more available slots in the sharedSnapshotArray."),
				 errhint("Another piece of code should have detected that we have too many clients."
						 " this probably means that someone isn't releasing their slot properly.")));
	}

	slot = &arrayP->slots[arrayP->nextSlot];

	slot->slotindex = arrayP->nextSlot;

	/*
	 * find the next available slot
	 */
	nextSlot = -1;
	for (i=arrayP->nextSlot+1; i < arrayP->maxSlots; i++)
	{
		SharedSnapshotSlot *tmpSlot = &arrayP->slots[i];

		if (tmpSlot->slotid == -1)
		{
			nextSlot = i;
			break;
		}
	}

	arrayP->nextSlot = nextSlot;
	arrayP->numSlots++;

	/* initialize some things */
	slot->slotid = slotId;
	slot->fullXid = InvalidFullTransactionId;
	slot->startTimestamp = 0;
	slot->distributedXid = InvalidDistributedTransactionId;
	slot->segmateSync = 0;
	/* Remember the writer proc for IsCurrentTransactionIdForReader */
	slot->writer_proc = MyProc;
	slot->writer_xact = MyPgXact;

	LWLockRelease(SharedSnapshotLock);

	return slot;
}

void
GetSlotTableDebugInfo(void **snapshotArray, int *maxSlots)
{
	*snapshotArray = (void *)sharedSnapshotArray;
	*maxSlots = sharedSnapshotArray->maxSlots;
}

/*
 * Used by "reader" qExecs to find the slot in the sharedsnapshotArray with the
 * specified slotId.  In general, we should always be able to find the specified
 * slot unless something unexpected.  If the slot is not found, then NULL is
 * returned.
 *
 * MPP-4599: retry in the same pattern as the writer.
 */
static SharedSnapshotSlot *
SharedSnapshotLookup(int32 slotId)
{
	SharedSnapshotSlot *slot = NULL;
	volatile SharedSnapshotStruct *arrayP = sharedSnapshotArray;
	int retryCount = gp_snapshotadd_timeout * 10; /* .1 s per wait */
	int index;

	for (;;)
	{
		CHECK_FOR_INTERRUPTS();

		LWLockAcquire(SharedSnapshotLock, LW_SHARED);

		for (index=0; index < arrayP->maxSlots; index++)
		{
			SharedSnapshotSlot *testSlot;

			testSlot = &arrayP->slots[index];

			if (testSlot->slotindex > arrayP->maxSlots)
				elog(ERROR, "Shared Local Snapshots Array appears corrupted: %s", SharedSnapshotDump());

			if (testSlot->slotid == slotId)
			{
				slot = testSlot;
				break;
			}
		}

		LWLockRelease(SharedSnapshotLock);

		if (slot != NULL)
		{
			break;
		}
		else
		{
			if (retryCount > 0)
			{
				retryCount--;

				pg_usleep(100000); /* 100ms, wait gp_snapshotadd_timeout seconds max. */
			}
			else
			{
				break;
			}
		}
	}

	return slot;
}


/*
 * Used by the "writer" qExec to "release" the slot it had been using.
 *
 */
void
SharedSnapshotRemove(volatile SharedSnapshotSlot *slot, char *creatorDescription)
{
	int slotId = slot->slotid;

	LWLockAcquire(SharedSnapshotLock, LW_EXCLUSIVE);

	/* determine if we need to modify the next available slot to use.  we
	 * only do this is our slotindex is lower then the existing one.
	 */
	if (sharedSnapshotArray->nextSlot == -1 || slot->slotindex < sharedSnapshotArray->nextSlot)
	{
		if (slot->slotindex > sharedSnapshotArray->maxSlots)
			elog(ERROR, "Shared Local Snapshots slot has a bogus slotindex: %d. slot array dump: %s",
				 slot->slotindex, SharedSnapshotDump());

		sharedSnapshotArray->nextSlot = slot->slotindex;
	}

	/* reset the slotid which marks it as being unused. */
	slot->slotid = -1;
	slot->fullXid = InvalidFullTransactionId;
	slot->startTimestamp = 0;
	slot->distributedXid = InvalidDistributedTransactionId;
	slot->segmateSync = 0;

	sharedSnapshotArray->numSlots -= 1;

	LWLockRelease(SharedSnapshotLock);

	elog((Debug_print_full_dtm ? LOG : DEBUG5),"SharedSnapshotRemove removed slot for slotId = %d, creator = %s (address %p)",
		 slotId, creatorDescription, SharedLocalSnapshotSlot);
}

void
addSharedSnapshot(char *creatorDescription, int id)
{
	SharedLocalSnapshotSlot = SharedSnapshotAdd(id);

	elog((Debug_print_full_dtm ? LOG : DEBUG5),"%s added Shared Local Snapshot slot for gp_session_id = %d (address %p)",
		 creatorDescription, id, SharedLocalSnapshotSlot);
}

void
lookupSharedSnapshot(char *lookerDescription, char *creatorDescription, int id)
{
	SharedSnapshotSlot *slot;

	slot = SharedSnapshotLookup(id);

	if (slot == NULL)
	{
		LWLockAcquire(SharedSnapshotLock, LW_SHARED);
		ereport(ERROR,
				(errmsg("%s could not find Shared Local Snapshot!",
						lookerDescription),
				 errdetail("Tried to find a shared snapshot slot with id: %d "
						   "and found none. Shared Local Snapshots dump: %s", id,
						   SharedSnapshotDump()),
				 errhint("Either this %s was created before the %s or the %s died.",
						 lookerDescription, creatorDescription, creatorDescription)));
	}

	SharedLocalSnapshotSlot = slot;

	elog((Debug_print_full_dtm ? LOG : DEBUG5),"%s found Shared Local Snapshot slot for gp_session_id = %d created by %s (address %p)",
		 lookerDescription, id, creatorDescription, SharedLocalSnapshotSlot);
}

/*
 * Dump the shared local snapshot, so that the readers can pick it up.
 */
void
dumpSharedLocalSnapshot_forCursor(void)
{
	ResourceOwner oldowner;
	SharedSnapshotSlot *src = NULL;

	Assert(Gp_role == GP_ROLE_DISPATCH || (Gp_role == GP_ROLE_EXECUTE && Gp_is_writer));
	Assert(SharedLocalSnapshotSlot != NULL);

	if (DumpResOwner== NULL)
		DumpResOwner = ResourceOwnerCreate(NULL, "SharedSnapshotDumpResOwner");

	LWLockAcquire(SharedLocalSnapshotSlot->slotLock, LW_EXCLUSIVE);

	created_dump = true;
	oldowner = CurrentResourceOwner;
	CurrentResourceOwner = DumpResOwner;

	src = (SharedSnapshotSlot *)SharedLocalSnapshotSlot;

	Size sz ;
	dsm_segment *segment;
	int id = src->cur_dump_id;
	volatile SnapshotDump *pDump = &src->dump[id];

	if(pDump->segment)
		dsm_detach(pDump->segment);

	sz = EstimateSnapshotSpace(&src->snapshot);
	segment = dsm_create(sz, 0);

	char *ptr = dsm_segment_address(segment);
	SerializeSnapshot(&src->snapshot, ptr);

	pDump->segment = segment;
	pDump->handle = dsm_segment_handle(segment);
	pDump->segmateSync = src->segmateSync;
	pDump->distributedXid = src->distributedXid;
	pDump->localXid = src->fullXid;

	elog(LOG, "Dump syncmate : %u snapshot to slot %d", src->segmateSync, id);

	src->cur_dump_id =
		(src->cur_dump_id + 1) % SNAPSHOTDUMPARRAYSZ;

	CurrentResourceOwner = oldowner;
	LWLockRelease(SharedLocalSnapshotSlot->slotLock);
}

void
readSharedLocalSnapshot_forCursor(Snapshot snapshot, DtxContext distributedTransactionContext)
{
	bool found;
	SharedSnapshotSlot *src = NULL;
	FullTransactionId localXid;

	Assert(Gp_role == GP_ROLE_EXECUTE);
	Assert(!Gp_is_writer);
	Assert(SharedLocalSnapshotSlot != NULL);
	Assert(snapshot->xip != NULL);

	SIMPLE_FAULT_INJECTOR("before_read_shared_snapshot_for_cursor");

	if (dumpHtab == NULL)
	{
		HASHCTL     hash_ctl;
		memset(&hash_ctl, 0, sizeof(hash_ctl));
		hash_ctl.keysize = sizeof(uint32);
		hash_ctl.entrysize = sizeof(DumpEntry);
		hash_ctl.hcxt = TopTransactionContext;
		dumpHtab = hash_create("snapshot dump",
		                       DUMP_HASH_SZ  ,
		                       &hash_ctl,
		                       HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
	}

	/* check segmate in local memory, only sync from shared memory once */
	DumpEntry *entry = hash_search(dumpHtab, &QEDtxContextInfo.segmateSync, HASH_ENTER, &found);
	volatile SnapshotDump *pDump = NULL;

	if (!found)
	{
		src = (SharedSnapshotSlot *)SharedLocalSnapshotSlot;
		LWLockAcquire(src->slotLock, LW_SHARED);
		int search_finish_id = src->cur_dump_id;
		int search_iter = search_finish_id;

		do{
			if (search_iter < 0)
				search_iter = SNAPSHOTDUMPARRAYSZ - 1;

			if (src->dump[search_iter].segmateSync == QEDtxContextInfo.segmateSync &&
				src->dump[search_iter].distributedXid == QEDtxContextInfo.distributedXid)
			{
				pDump = &src->dump[search_iter];
				found = true;
				break;
			}

			search_iter --;

		} while (search_iter != search_finish_id);

		if (!found)
		{
			LWLockRelease(SharedLocalSnapshotSlot->slotLock);
			LWLockAcquire(SharedSnapshotLock, LW_SHARED); /* For SharedSnapshotDump() */
			ereport(ERROR, (errmsg("could not find Shared Local Snapshot!"),
				errdetail("Tried to set the shared local snapshot slot with segmate: %u "
				          "and failed. Shared Local Snapshots dump: %s", QEDtxContextInfo.segmateSync,
				          SharedSnapshotDump())));
		}

		Assert(pDump->handle != 0);

		dsm_segment* segment = dsm_attach(pDump->handle);
		char *ptr = dsm_segment_address(segment);

		entry->snapshot = RestoreSnapshot(ptr);
		entry->localXid = pDump->localXid;

		dsm_detach(segment);

		LWLockRelease(SharedLocalSnapshotSlot->slotLock);
	}

	Snapshot dumpsnapshot = entry->snapshot;
	localXid = entry->localXid;
	snapshot->xmin = dumpsnapshot->xmin;
	snapshot->xmax = dumpsnapshot->xmax;
	snapshot->xcnt = dumpsnapshot->xcnt;

	memcpy(snapshot->xip, dumpsnapshot->xip, snapshot->xcnt * sizeof(TransactionId));

	/* zero out the slack in the xip-array */
	memset(snapshot->xip + snapshot->xcnt, 0, (xipEntryCount - snapshot->xcnt)*sizeof(TransactionId));

	snapshot->curcid = dumpsnapshot->curcid;

	SetSharedTransactionId_reader(
		localXid,
		snapshot->curcid,
		distributedTransactionContext);

}

/*
 * Free any shared snapshot files.
 */
void
AtEOXact_SharedSnapshot(void)
{
	dumpHtab = NULL;

	if (created_dump)
	{
		LWLockAcquire(SharedLocalSnapshotSlot->slotLock, LW_EXCLUSIVE);

		/* release dump dsm */
		ResourceOwnerRelease(DumpResOwner,
		                     RESOURCE_RELEASE_BEFORE_LOCKS,
		                     false, /* isCommit */
		                     true); /* isTopLevel */

		SharedLocalSnapshotSlot->cur_dump_id = 0;
		MemSet(SharedLocalSnapshotSlot->dump, 0, sizeof(SnapshotDump) * SNAPSHOTDUMPARRAYSZ);
		created_dump = false;
		LWLockRelease(SharedLocalSnapshotSlot->slotLock);

	}
}

/*
 * LogDistributedSnapshotInfo
 *   Log the distributed snapshot info in a given snapshot.
 *
 * The 'prefix' is used to prefix the log message.
 */
void
LogDistributedSnapshotInfo(Snapshot snapshot, const char *prefix)
{
	if (!IsMVCCSnapshot(snapshot))
		return;

	StringInfoData buf;
	initStringInfo(&buf);

	DistributedSnapshotWithLocalMapping *mapping =
		&(snapshot->distribSnapshotWithLocalMapping);

	DistributedSnapshot *ds = &mapping->ds;

	appendStringInfo(&buf, "%s Distributed snapshot info: "
			 "xminAllDistributedSnapshots="UINT64_FORMAT", distribSnapshotId=%d"
			 ", xmin="UINT64_FORMAT", xmax="UINT64_FORMAT", count=%d",
			 prefix,
			 ds->xminAllDistributedSnapshots,
			 ds->distribSnapshotId,
			 ds->xmin,
			 ds->xmax,
			 ds->count);

	appendStringInfoString(&buf, ", In progress array: {");

	for (int no = 0; no < ds->count; no++)
	{
		if (no != 0)
		{
			appendStringInfo(&buf, ", (dx"UINT64_FORMAT")",
					 ds->inProgressXidArray[no]);
		}
		else
		{
			appendStringInfo(&buf, " (dx"UINT64_FORMAT")",
					 ds->inProgressXidArray[no]);
		}
	}
	appendStringInfoString(&buf, "}");

	elog(LOG, "%s", buf.data);
	pfree(buf.data);
}

相关信息

greenplumn 源码目录

相关文章

greenplumn combocid 源码

greenplumn snapmgr 源码

0  赞