greenplumn cdbtm 源码
greenplumn cdbtm 代码
文件路径:/src/backend/cdb/cdbtm.c
/*-------------------------------------------------------------------------
*
* cdbtm.c
* Provides routines for performing distributed transaction
*
* Portions Copyright (c) 2005-2009, Greenplum inc
* Portions Copyright (c) 2012-Present VMware, Inc. or its affiliates.
*
*
* IDENTIFICATION
* src/backend/cdb/cdbtm.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include <time.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include "catalog/pg_authid.h"
#include "cdb/cdbtm.h"
#include "libpq/libpq-be.h"
#include "libpq/pqformat.h"
#include "miscadmin.h"
#include "storage/lmgr.h"
#include "storage/pmsignal.h"
#include "storage/s_lock.h"
#include "storage/shmem.h"
#include "storage/ipc.h"
#include "cdb/cdbdisp.h"
#include "cdb/cdbdisp_query.h"
#include "cdb/cdbdisp_dtx.h"
#include "cdb/cdbdispatchresult.h"
#include "cdb/cdbdtxcontextinfo.h"
#include "cdb/cdbvars.h"
#include "access/transam.h"
#include "access/xact.h"
#include "libpq-fe.h"
#include "libpq-int.h"
#include "cdb/cdbfts.h"
#include "lib/stringinfo.h"
#include "access/twophase.h"
#include "access/distributedlog.h"
#include "postmaster/postmaster.h"
#include "port/atomics.h"
#include "storage/procarray.h"
#include "cdb/cdbllize.h"
#include "utils/faultinjector.h"
#include "utils/guc.h"
#include "utils/fmgrprotos.h"
#include "utils/fmgroids.h"
#include "utils/session_state.h"
#include "utils/sharedsnapshot.h"
#include "utils/snapmgr.h"
#include "utils/memutils.h"
typedef struct TmControlBlock
{
bool DtmStarted;
bool CleanupBackends;
pid_t DtxRecoveryPid;
DtxRecoveryEvent DtxRecoveryEvents;
slock_t DtxRecoveryEventLock;
uint32 NextSnapshotId;
int num_committed_xacts;
slock_t gxidGenLock;
/* Array [0..max_tm_gxacts-1] of DistributedTransactionId ptrs is appended starting here */
DistributedTransactionId committed_gxid_array[FLEXIBLE_ARRAY_MEMBER];
} TmControlBlock;
#define TMCONTROLBLOCK_BYTES(num_gxacts) \
(offsetof(TmControlBlock, committed_gxid_array) + sizeof(DistributedTransactionId) * (num_gxacts))
extern bool Test_print_direct_dispatch_info;
#define DTX_PHASE2_SLEEP_TIME_BETWEEN_RETRIES_MSECS 100
uint32 *shmNextSnapshotId;
slock_t *shmGxidGenLock;
int max_tm_gxacts = 100;
int gp_gxid_prefetch_num;
#define GXID_PRETCH_THRESHOLD (gp_gxid_prefetch_num>>1)
#define TM_ERRDETAIL (errdetail("gid=" UINT64_FORMAT ", state=%s", \
getDistributedTransactionId(),\
DtxStateToString(MyTmGxactLocal ? MyTmGxactLocal->state : 0)))
/* here are some flag options relationed to the txnOptions field of
* PQsendGpQuery
*/
/*
* bit 1 is for statement wants DTX transaction
* bits 2-4 for iso level
* bit 5 is for read-only
*/
#define GP_OPT_NEED_DTX 0x0001
#define GP_OPT_ISOLATION_LEVEL_MASK 0x000E
#define GP_OPT_READ_UNCOMMITTED (1 << 1)
#define GP_OPT_READ_COMMITTED (2 << 1)
#define GP_OPT_REPEATABLE_READ (3 << 1)
#define GP_OPT_SERIALIZABLE (4 << 1)
#define GP_OPT_READ_ONLY 0x0010
#define GP_OPT_EXPLICT_BEGIN 0x0020
/*=========================================================================
* FUNCTIONS PROTOTYPES
*/
static void doPrepareTransaction(void);
static void doInsertForgetCommitted(void);
static void doNotifyingOnePhaseCommit(void);
static void doNotifyingCommitPrepared(void);
static void doNotifyingAbort(void);
static void retryAbortPrepared(void);
static void doQEDistributedExplicitBegin();
static void currentDtxActivate(void);
static void setCurrentDtxState(DtxState state);
static bool isDtxQueryDispatcher(void);
static void performDtxProtocolCommitPrepared(const char *gid, bool raiseErrorIfNotFound);
static void performDtxProtocolAbortPrepared(const char *gid, bool raiseErrorIfNotFound);
static void sendWaitGxidsToQD(List *waitGxids);
extern void GpDropTempTables(void);
void
setDistributedTransactionContext(DtxContext context)
{
elog((Debug_print_full_dtm ? LOG : DEBUG5),
"Setting DistributedTransactionContext to '%s'",
DtxContextToString(context));
DistributedTransactionContext = context;
}
static void
requireDistributedTransactionContext(DtxContext requiredCurrentContext)
{
if (DistributedTransactionContext != requiredCurrentContext)
{
elog(FATAL, "Expected segment distributed transaction context to be '%s', found '%s'",
DtxContextToString(requiredCurrentContext),
DtxContextToString(DistributedTransactionContext));
}
}
static bool
isDtxContext(void)
{
return DistributedTransactionContext != DTX_CONTEXT_LOCAL_ONLY;
}
/*=========================================================================
* VISIBLE FUNCTIONS
*/
DistributedTransactionId
getDistributedTransactionId(void)
{
if (isDtxContext())
return MyTmGxact->gxid;
else
return InvalidDistributedTransactionId;
}
bool
getDistributedTransactionIdentifier(char *id)
{
Assert(MyTmGxactLocal != NULL);
if (isDtxContext() && MyTmGxact->gxid != InvalidDistributedTransactionId)
{
/*
* The length check here requires the identifer have a trailing
* NUL character.
*/
dtxFormGid(id, MyTmGxact->gxid);
return true;
}
MemSet(id, 0, TMGIDSIZE);
return false;
}
bool
isPreparedDtxTransaction(void)
{
AssertImply(MyTmGxactLocal->state == DTX_STATE_PREPARED,
(Gp_role == GP_ROLE_DISPATCH &&
DistributedTransactionContext == DTX_CONTEXT_QD_DISTRIBUTED_CAPABLE));
return (MyTmGxactLocal->state == DTX_STATE_PREPARED);
}
/*
* The executor can avoid starting a distributed transaction if it knows that
* the current dtx is clean and we aren't in a user-started global transaction.
*/
bool
isCurrentDtxActivated(void)
{
return MyTmGxactLocal->state != DTX_STATE_NONE;
}
void
bumpGxid()
{
DistributedTransactionId nextLimit;
uint32 nextCount;
/*
* Someone else might have done this, so if the lock was blocked and is now
* free, just return.
*/
if (!LWLockAcquireOrWait(GxidBumpLock, LW_EXCLUSIVE))
return;
/*
* No need to bump if there have been enough gxid. This is possible if
* another bump finished before we tried to lock GxidBumpLock.
*/
if (ShmemVariableCache->GxidCount > GXID_PRETCH_THRESHOLD)
{
LWLockRelease(GxidBumpLock);
return;
}
/* nextLimit should be always multiple of gp_gxid_prefetch_num. */
SpinLockAcquire(shmGxidGenLock);
nextCount = ShmemVariableCache->GxidCount + gp_gxid_prefetch_num;
nextLimit = ShmemVariableCache->nextGxid + nextCount;
if (nextLimit >= (LastDistributedTransactionId - gp_gxid_prefetch_num))
ereport(PANIC,
(errmsg("Will soon reach the limit of global transactions: "UINT64_FORMAT,
ShmemVariableCache->nextGxid)));
SpinLockRelease(shmGxidGenLock);
/* It might be time-consuming, so put it out of the spin locking section. */
XLogPutNextGxid(nextLimit);
SpinLockAcquire(shmGxidGenLock);
ShmemVariableCache->GxidCount += gp_gxid_prefetch_num;
SpinLockRelease(shmGxidGenLock);
/* Only one bump operation one time, so lock till the end. */
LWLockRelease(GxidBumpLock);
}
static void
currentDtxActivate(void)
{
bool signal_dtx_recovery;
if (ShmemVariableCache->GxidCount <= GXID_PRETCH_THRESHOLD &&
(GetDtxRecoveryEvent() & DTX_RECOVERY_EVENT_BUMP_GXID) == 0)
{
signal_dtx_recovery = false;
SpinLockAcquire(shmDtxRecoveryEventLock);
if ((GetDtxRecoveryEvent() & DTX_RECOVERY_EVENT_BUMP_GXID) == 0)
{
/* dtx recovery is not notified, wake up dtx recovery to prefetch. */
SetDtxRecoveryEvent(DTX_RECOVERY_EVENT_BUMP_GXID);
signal_dtx_recovery = true;
}
SpinLockRelease(shmDtxRecoveryEventLock);
if (signal_dtx_recovery)
SendPostmasterSignal(PMSIGNAL_WAKEN_DTX_RECOVERY);
}
/*
* We need to retry since in theory even after gxid bumping, we still can
* not get an available gxid if other backends quickly consume all of the
* generated gxid. This mostly happens when the system is with high
* performance and load but with low gxid prefetch batch size. It should be
* rare so far, but in case in the future...
*/
for(;;)
{
if (unlikely(ShmemVariableCache->GxidCount == 0))
bumpGxid();
SpinLockAcquire(shmGxidGenLock);
if (ShmemVariableCache->GxidCount > 0)
{
MyTmGxact->gxid = ShmemVariableCache->nextGxid++;
ShmemVariableCache->GxidCount--;
SpinLockRelease(shmGxidGenLock);
break;
}
SpinLockRelease(shmGxidGenLock);
}
MyTmGxact->sessionId = gp_session_id;
setCurrentDtxState(DTX_STATE_ACTIVE_DISTRIBUTED);
GxactLockTableInsert(MyTmGxact->gxid);
}
static void
setCurrentDtxState(DtxState state)
{
MyTmGxactLocal->state = state;
}
DtxState
getCurrentDtxState(void)
{
return MyTmGxactLocal ? MyTmGxactLocal->state : DTX_STATE_NONE;
}
bool
notifyCommittedDtxTransactionIsNeeded(void)
{
if (DistributedTransactionContext != DTX_CONTEXT_QD_DISTRIBUTED_CAPABLE)
{
elog(DTM_DEBUG5, "notifyCommittedDtxTransaction nothing to do (DistributedTransactionContext = '%s')",
DtxContextToString(DistributedTransactionContext));
return false;
}
if (!isCurrentDtxActivated())
{
elog(DTM_DEBUG5, "notifyCommittedDtxTransaction nothing to do (two phase not activated)");
return false;
}
return true;
}
/*
* Notify committed a global transaction, called by user commit
* or by CommitTransaction
*/
void
notifyCommittedDtxTransaction(void)
{
ListCell *l;
Assert(Gp_role == GP_ROLE_DISPATCH);
Assert(DistributedTransactionContext == DTX_CONTEXT_QD_DISTRIBUTED_CAPABLE);
Assert(isCurrentDtxActivated());
switch(MyTmGxactLocal->state)
{
case DTX_STATE_INSERTED_COMMITTED:
doNotifyingCommitPrepared();
break;
case DTX_STATE_NOTIFYING_ONE_PHASE_COMMIT:
case DTX_STATE_ONE_PHASE_COMMIT:
/* Already notified for one phase commit or no need to notify. */
break;
default:
;
TransactionId xid = GetTopTransactionIdIfAny();
bool markXidCommitted = TransactionIdIsValid(xid);
/*
* If local commit xlog is written we can not throw error and then
* abort transaction (that will cause panic) so directly panic
* for that case with more details.
*/
ereport(markXidCommitted ? PANIC : ERROR,
(errmsg("Unexpected DTX state"), TM_ERRDETAIL));
}
foreach(l, MyTmGxactLocal->waitGxids)
{
GxactLockTableWait(lfirst_int(l));
}
}
void
setupDtxTransaction(void)
{
if (!IsTransactionState())
elog(ERROR, "DTM transaction is not active");
if (!isCurrentDtxActivated())
currentDtxActivate();
if (MyTmGxactLocal->state != DTX_STATE_ACTIVE_DISTRIBUTED)
elog(ERROR, "DTM transaction state (%s) is invalid", DtxStateToString(MyTmGxactLocal->state));
}
/*
* Routine to dispatch internal sub-transaction calls from UDFs to segments.
* The calls are BeginInternalSubTransaction, ReleaseCurrentSubTransaction and
* RollbackAndReleaseCurrentSubTransaction.
*/
bool
doDispatchSubtransactionInternalCmd(DtxProtocolCommand cmdType)
{
char *serializedDtxContextInfo = NULL;
int serializedDtxContextInfoLen = 0;
char gid[TMGIDSIZE];
bool succeeded = false;
if (currentGxactWriterGangLost())
{
ereport(WARNING,
(errmsg("writer gang of current global transaction is lost")));
return false;
}
if (cmdType == DTX_PROTOCOL_COMMAND_SUBTRANSACTION_BEGIN_INTERNAL &&
!isCurrentDtxActivated())
{
currentDtxActivate();
}
serializedDtxContextInfo = qdSerializeDtxContextInfo(&serializedDtxContextInfoLen,
false /* wantSnapshot */ ,
false /* inCursor */ ,
mppTxnOptions(true),
"doDispatchSubtransactionInternalCmd");
dtxFormGid(gid, getDistributedTransactionId());
succeeded = doDispatchDtxProtocolCommand(cmdType,
gid,
/* raiseError */ true,
cdbcomponent_getCdbComponentsList(),
serializedDtxContextInfo, serializedDtxContextInfoLen);
/* send a DTM command to others to tell them about the transaction */
if (!succeeded)
{
ereport(ERROR,
(errmsg("dispatching subtransaction internal command failed for gid = \"%s\" due to error", gid)));
}
return succeeded;
}
static void
doPrepareTransaction(void)
{
bool succeeded;
CHECK_FOR_INTERRUPTS();
elog(DTM_DEBUG5, "doPrepareTransaction entering in state = %s",
DtxStateToString(MyTmGxactLocal->state));
/*
* Don't allow a cancel while we're dispatching our prepare (we wrap our
* state change as well; for good measure.
*/
HOLD_INTERRUPTS();
Assert(MyTmGxactLocal->state == DTX_STATE_ACTIVE_DISTRIBUTED);
setCurrentDtxState(DTX_STATE_PREPARING);
elog(DTM_DEBUG5, "doPrepareTransaction moved to state = %s", DtxStateToString(MyTmGxactLocal->state));
Assert(MyTmGxactLocal->dtxSegments != NIL);
succeeded = currentDtxDispatchProtocolCommand(DTX_PROTOCOL_COMMAND_PREPARE, true);
/*
* Now we've cleaned up our dispatched statement, cancels are allowed
* again.
*/
RESUME_INTERRUPTS();
if (!succeeded)
{
ereport(ERROR,
(errmsg("The distributed transaction 'Prepare' broadcast failed to one or more segments"),
TM_ERRDETAIL));
}
ereport(DTM_DEBUG5,
(errmsg("The distributed transaction 'Prepare' broadcast succeeded to the segments"),
TM_ERRDETAIL));
Assert(MyTmGxactLocal->state == DTX_STATE_PREPARING);
setCurrentDtxState(DTX_STATE_PREPARED);
SIMPLE_FAULT_INJECTOR("dtm_broadcast_prepare");
elog(DTM_DEBUG5, "doPrepareTransaction leaving in state = %s", DtxStateToString(MyTmGxactLocal->state));
}
/*
* Insert FORGET COMMITTED into the xlog.
*/
static void
doInsertForgetCommitted(void)
{
elog(DTM_DEBUG5, "doInsertForgetCommitted entering in state = %s", DtxStateToString(MyTmGxactLocal->state));
setCurrentDtxState(DTX_STATE_INSERTING_FORGET_COMMITTED);
RecordDistributedForgetCommitted(getDistributedTransactionId());
setCurrentDtxState(DTX_STATE_INSERTED_FORGET_COMMITTED);
MyTmGxact->includeInCkpt = false;
}
static void
doNotifyingOnePhaseCommit(void)
{
bool succeeded;
if (MyTmGxactLocal->dtxSegments == NIL)
return;
elog(DTM_DEBUG5, "doNotifyingOnePhaseCommit entering in state = %s", DtxStateToString(MyTmGxactLocal->state));
Assert(MyTmGxactLocal->state == DTX_STATE_ONE_PHASE_COMMIT);
setCurrentDtxState(DTX_STATE_NOTIFYING_ONE_PHASE_COMMIT);
succeeded = currentDtxDispatchProtocolCommand(DTX_PROTOCOL_COMMAND_COMMIT_ONEPHASE, true);
if (!succeeded)
{
/* If error is not thrown after failure then we have to throw it. */
Assert(MyTmGxactLocal->state == DTX_STATE_NOTIFYING_ONE_PHASE_COMMIT);
ereport(ERROR,
(errmsg("one phase commit notification failed"),
TM_ERRDETAIL));
}
}
static void
doNotifyingCommitPrepared(void)
{
bool succeeded;
int retry = 0;
volatile int savedInterruptHoldoffCount;
MemoryContext oldcontext = CurrentMemoryContext;;
time_t retry_time_start;
bool retry_timedout;
elog(DTM_DEBUG5, "doNotifyingCommitPrepared entering in state = %s", DtxStateToString(MyTmGxactLocal->state));
Assert(MyTmGxactLocal->state == DTX_STATE_INSERTED_COMMITTED);
setCurrentDtxState(DTX_STATE_NOTIFYING_COMMIT_PREPARED);
SIMPLE_FAULT_INJECTOR("dtm_broadcast_commit_prepared");
/*
* Acquire TwophaseCommitLock in shared mode to block any GPDB restore
* points from being created while commit prepared messages are being
* broadcasted.
*/
LWLockAcquire(TwophaseCommitLock, LW_SHARED);
savedInterruptHoldoffCount = InterruptHoldoffCount;
Assert(MyTmGxactLocal->dtxSegments != NIL);
PG_TRY();
{
succeeded = currentDtxDispatchProtocolCommand(DTX_PROTOCOL_COMMAND_COMMIT_PREPARED, true);
}
PG_CATCH();
{
/*
* restore the previous value, which is reset to 0 in errfinish.
*/
MemoryContextSwitchTo(oldcontext);
InterruptHoldoffCount = savedInterruptHoldoffCount;
succeeded = false;
FlushErrorState();
}
PG_END_TRY();
if (!succeeded)
{
Assert(MyTmGxactLocal->state == DTX_STATE_NOTIFYING_COMMIT_PREPARED);
ereport(DTM_DEBUG5,
(errmsg("marking retry needed for distributed transaction "
"'Commit Prepared' broadcast to the segments"),
TM_ERRDETAIL));
setCurrentDtxState(DTX_STATE_RETRY_COMMIT_PREPARED);
setDistributedTransactionContext(DTX_CONTEXT_QD_RETRY_PHASE_2);
}
retry_timedout = (dtx_phase2_retry_second == 0) ? true : false;
retry_time_start = time(NULL);
while (!succeeded && !retry_timedout)
{
retry++;
/*
* Sleep for some time before retry to avoid too many reries for some
* scenarios that retry completes soon. Also delay for longer when
* retry fails more and more times.
*/
pg_usleep(DTX_PHASE2_SLEEP_TIME_BETWEEN_RETRIES_MSECS * 1000 *
Min(Max(retry - 10, 1), 50));
ereport(WARNING,
(errmsg("the distributed transaction 'Commit Prepared' broadcast "
"failed to one or more segments. Retrying ... try %d", retry),
TM_ERRDETAIL));
/*
* We must succeed in delivering the commit to all segment instances,
* or any failed segment instances must be marked INVALID.
*/
elog(NOTICE, "Releasing segworker group to retry broadcast.");
ResetAllGangs();
savedInterruptHoldoffCount = InterruptHoldoffCount;
PG_TRY();
{
succeeded = currentDtxDispatchProtocolCommand(DTX_PROTOCOL_COMMAND_RETRY_COMMIT_PREPARED, true);
}
PG_CATCH();
{
/*
* restore the previous value, which is reset to 0 in errfinish.
*/
MemoryContextSwitchTo(oldcontext);
InterruptHoldoffCount = savedInterruptHoldoffCount;
succeeded = false;
FlushErrorState();
}
PG_END_TRY();
if ((time(NULL) - retry_time_start) > dtx_phase2_retry_second)
retry_timedout = true;
}
if (!succeeded)
ereport(PANIC,
(errmsg("unable to complete 'Commit Prepared' broadcast"),
TM_ERRDETAIL));
ereport(DTM_DEBUG5,
(errmsg("the distributed transaction 'Commit Prepared' broadcast succeeded to all the segments"),
TM_ERRDETAIL));
SIMPLE_FAULT_INJECTOR("dtm_before_insert_forget_comitted");
doInsertForgetCommitted();
/*
* We release the TwophaseCommitLock only after writing our distributed
* forget record which signifies that all query executors have written
* their commit prepared records.
*/
LWLockRelease(TwophaseCommitLock);
}
static void
retryAbortPrepared(void)
{
int retry = 0;
bool succeeded = false;
volatile int savedInterruptHoldoffCount;
MemoryContext oldcontext = CurrentMemoryContext;;
time_t retry_time_start;
bool retry_timedout;
retry_timedout = (dtx_phase2_retry_second == 0) ? true : false;
retry_time_start = time(NULL);
while (!succeeded && !retry_timedout)
{
retry++;
/*
* By deallocating the gang, we will force a new gang to connect to
* all the segment instances. And, we will abort the transactions in
* the segments. What's left are possibily prepared transactions.
*/
if (retry > 1)
{
elog(NOTICE, "Releasing segworker groups to retry broadcast.");
/*
* Sleep for some time before retry to avoid too many reries for
* some scenarios that retry completes soon. Also delay for longer
* when retry fails more and more times.
*/
pg_usleep(DTX_PHASE2_SLEEP_TIME_BETWEEN_RETRIES_MSECS * 1000 *
Min(Max(retry - 10, 1), 50));
}
ResetAllGangs();
savedInterruptHoldoffCount = InterruptHoldoffCount;
PG_TRY();
{
MyTmGxactLocal->dtxSegments = cdbcomponent_getCdbComponentsList();
succeeded = currentDtxDispatchProtocolCommand(DTX_PROTOCOL_COMMAND_RETRY_ABORT_PREPARED, true);
if (!succeeded)
ereport(WARNING,
(errmsg("the distributed transaction 'Abort' broadcast "
"failed to one or more segments. Retrying ... try %d", retry),
TM_ERRDETAIL));
}
PG_CATCH();
{
/*
* restore the previous value, which is reset to 0 in errfinish.
*/
MemoryContextSwitchTo(oldcontext);
InterruptHoldoffCount = savedInterruptHoldoffCount;
succeeded = false;
FlushErrorState();
}
PG_END_TRY();
if ((time(NULL) - retry_time_start) > dtx_phase2_retry_second)
retry_timedout = true;
}
if (!succeeded)
{
ResetAllGangs();
SpinLockAcquire(shmDtxRecoveryEventLock);
SetDtxRecoveryEvent(DTX_RECOVERY_EVENT_ABORT_PREPARED);
SpinLockRelease(shmDtxRecoveryEventLock);
SendPostmasterSignal(PMSIGNAL_WAKEN_DTX_RECOVERY);
ereport(WARNING,
(errmsg("unable to complete 'Abort' broadcast. The dtx recovery"
" process will continue trying that."),
TM_ERRDETAIL));
}
ereport(DTM_DEBUG5,
(errmsg("The distributed transaction 'Abort' broadcast succeeded to all the segments"),
TM_ERRDETAIL));
}
static void
doNotifyingAbort(void)
{
bool succeeded;
volatile int savedInterruptHoldoffCount;
MemoryContext oldcontext = CurrentMemoryContext;
elog(DTM_DEBUG5, "doNotifyingAborted entering in state = %s", DtxStateToString(MyTmGxactLocal->state));
switch (MyTmGxactLocal->state)
{
case DTX_STATE_NOTIFYING_ABORT_NO_PREPARED:
{
/*
* In some cases, dtmPreCommand said two phase commit is needed, but some errors
* occur before the command is actually dispatched, no need to dispatch DTX for
* such cases.
*/
succeeded = currentDtxDispatchProtocolCommand(DTX_PROTOCOL_COMMAND_ABORT_NO_PREPARED, false);
if (!succeeded)
{
ereport(WARNING,
(errmsg("The distributed transaction 'Abort' broadcast failed to one or more segments"),
TM_ERRDETAIL));
/*
* Reset the dispatch logic and disconnect from any segment
* that didn't respond to our abort.
*/
elog(NOTICE, "Releasing segworker groups to finish aborting the transaction.");
ResetAllGangs();
}
else
{
ereport(DTM_DEBUG5,
(errmsg("The distributed transaction 'Abort' broadcast succeeded to all the segments"),
TM_ERRDETAIL));
}
break;
}
case DTX_STATE_NOTIFYING_ABORT_SOME_PREPARED:
case DTX_STATE_NOTIFYING_ABORT_PREPARED:
{
DtxProtocolCommand dtxProtocolCommand;
char *abortString;
if (MyTmGxactLocal->state == DTX_STATE_NOTIFYING_ABORT_SOME_PREPARED)
{
dtxProtocolCommand = DTX_PROTOCOL_COMMAND_ABORT_SOME_PREPARED;
abortString = "Abort [Prepared]";
}
else
{
dtxProtocolCommand = DTX_PROTOCOL_COMMAND_ABORT_PREPARED;
abortString = "Abort Prepared";
}
savedInterruptHoldoffCount = InterruptHoldoffCount;
PG_TRY();
{
succeeded = currentDtxDispatchProtocolCommand(dtxProtocolCommand, true);
}
PG_CATCH();
{
/*
* restore the previous value, which is reset to 0 in errfinish.
*/
MemoryContextSwitchTo(oldcontext);
InterruptHoldoffCount = savedInterruptHoldoffCount;
succeeded = false;
FlushErrorState();
}
PG_END_TRY();
if (succeeded)
break;
ereport(WARNING,
(errmsg("the distributed transaction broadcast failed "
"to one or more segments"),
TM_ERRDETAIL));
setCurrentDtxState(DTX_STATE_RETRY_ABORT_PREPARED);
setDistributedTransactionContext(DTX_CONTEXT_QD_RETRY_PHASE_2);
}
/* FALL THRU */
case DTX_STATE_RETRY_ABORT_PREPARED:
retryAbortPrepared();
break;
default:
elog(PANIC, "Unexpected Dtx state: %s", DtxStateToString(MyTmGxactLocal->state));
}
SIMPLE_FAULT_INJECTOR("dtm_broadcast_abort_prepared");
Assert(CurrentDtxIsRollingback());
}
/*
* prepare a global transaction, called by user commit
* or by CommitTransaction
*/
void
prepareDtxTransaction(void)
{
TransactionId xid = GetTopTransactionIdIfAny();
bool markXidCommitted = TransactionIdIsValid(xid);
if (DistributedTransactionContext != DTX_CONTEXT_QD_DISTRIBUTED_CAPABLE)
{
elog(DTM_DEBUG5, "prepareDtxTransaction nothing to do (DistributedTransactionContext = '%s')",
DtxContextToString(DistributedTransactionContext));
Assert(Gp_role != GP_ROLE_DISPATCH || MyTmGxact->gxid == InvalidDistributedTransactionId);
return;
}
if (!isCurrentDtxActivated())
{
Assert(MyTmGxactLocal->state == DTX_STATE_NONE);
Assert(Gp_role != GP_ROLE_DISPATCH || MyTmGxact->gxid == InvalidDistributedTransactionId);
resetTmGxact();
return;
}
/*
* If only one segment was involved in the transaction, and no local XID
* has been assigned on the QD either, or there is no xlog writing related
* to this transaction on all segments, we can perform one-phase commit.
* Otherwise, broadcast PREPARE TRANSACTION to the segments.
*/
if (!TopXactExecutorDidWriteXLog() ||
(!markXidCommitted && list_length(MyTmGxactLocal->dtxSegments) < 2))
{
setCurrentDtxState(DTX_STATE_ONE_PHASE_COMMIT);
/*
* Notify one phase commit to QE before local transaction xlog recording
* since if it fails we still have chance of aborting the transaction.
*/
doNotifyingOnePhaseCommit();
return;
}
elog(DTM_DEBUG5,
"prepareDtxTransaction called with state = %s",
DtxStateToString(MyTmGxactLocal->state));
Assert(MyTmGxactLocal->state == DTX_STATE_ACTIVE_DISTRIBUTED);
doPrepareTransaction();
}
/*
* rollback a global transaction, called by user rollback
* or by AbortTransaction during Postgres automatic rollback
*/
void
rollbackDtxTransaction(void)
{
if (DistributedTransactionContext != DTX_CONTEXT_QD_DISTRIBUTED_CAPABLE)
{
elog(DTM_DEBUG5, "rollbackDtxTransaction nothing to do (DistributedTransactionContext = '%s')",
DtxContextToString(DistributedTransactionContext));
return;
}
if (!isCurrentDtxActivated())
{
elog(DTM_DEBUG5, "rollbackDtxTransaction nothing to do (two phase not activate)");
return;
}
ereport(DTM_DEBUG5,
(errmsg("rollbackDtxTransaction called"),
TM_ERRDETAIL));
switch (MyTmGxactLocal->state)
{
case DTX_STATE_ACTIVE_DISTRIBUTED:
case DTX_STATE_ONE_PHASE_COMMIT:
case DTX_STATE_NOTIFYING_ONE_PHASE_COMMIT:
if (!MyTmGxactLocal->dtxSegments || currentGxactWriterGangLost())
{
ereport(DTM_DEBUG5,
(errmsg("The distributed transaction 'Abort' broadcast was omitted (segworker group already dead)"),
TM_ERRDETAIL));
return;
}
setCurrentDtxState(DTX_STATE_NOTIFYING_ABORT_NO_PREPARED);
break;
case DTX_STATE_PREPARING:
/*
* The writer gang is detected broken during preparing, then it has been destroyed
* in AtAbort_DispatcherState(). In this way, we will create a new writer gang to
* do the rollback. As this new writer gang is in DTX_CONTEXT_LOCAL_ONLY context,
* we need to dispatch DTX_STATE_RETRY_ABORT_PREPARED command instead of
* DTX_STATE_NOTIFYING_ABORT_SOME_PREPARED.
*/
if (currentGxactWriterGangLost())
setCurrentDtxState(DTX_STATE_RETRY_ABORT_PREPARED);
else
setCurrentDtxState(DTX_STATE_NOTIFYING_ABORT_SOME_PREPARED);
break;
case DTX_STATE_PREPARED:
setCurrentDtxState(DTX_STATE_NOTIFYING_ABORT_PREPARED);
break;
case DTX_STATE_NOTIFYING_ABORT_NO_PREPARED:
/*
* By deallocating the gang, we will force a new gang to connect
* to all the segment instances. And, we will abort the
* transactions in the segments.
*/
elog(NOTICE, "Releasing segworker groups to finish aborting the transaction.");
ResetAllGangs();
return;
case DTX_STATE_NOTIFYING_ABORT_SOME_PREPARED:
case DTX_STATE_NOTIFYING_ABORT_PREPARED:
ereport(FATAL,
(errmsg("Unable to complete the 'Abort Prepared' broadcast"),
TM_ERRDETAIL));
break;
case DTX_STATE_NOTIFYING_COMMIT_PREPARED:
case DTX_STATE_INSERTING_COMMITTED:
case DTX_STATE_INSERTED_COMMITTED:
case DTX_STATE_INSERTING_FORGET_COMMITTED:
case DTX_STATE_INSERTED_FORGET_COMMITTED:
case DTX_STATE_RETRY_COMMIT_PREPARED:
case DTX_STATE_RETRY_ABORT_PREPARED:
elog(DTM_DEBUG5, "rollbackDtxTransaction dtx state \"%s\" not expected here",
DtxStateToString(MyTmGxactLocal->state));
return;
default:
elog(PANIC, "Unrecognized dtx state: %d",
(int) MyTmGxactLocal->state);
break;
}
Assert(MyTmGxactLocal->state == DTX_STATE_NOTIFYING_ABORT_NO_PREPARED ||
MyTmGxactLocal->state == DTX_STATE_NOTIFYING_ABORT_SOME_PREPARED ||
MyTmGxactLocal->state == DTX_STATE_RETRY_ABORT_PREPARED ||
MyTmGxactLocal->state == DTX_STATE_NOTIFYING_ABORT_PREPARED);
/*
* if the process is in the middle of blowing up... then we don't do
* anything here. we can resolve any in-doubt transactions later.
*
* We can't dispatch -- but we *do* need to free up shared-memory entries.
*/
if (proc_exit_inprogress)
{
/*
* Unable to complete distributed abort broadcast with possible
* prepared transactions...
*/
if (MyTmGxactLocal->state == DTX_STATE_NOTIFYING_ABORT_SOME_PREPARED ||
MyTmGxactLocal->state == DTX_STATE_RETRY_ABORT_PREPARED ||
MyTmGxactLocal->state == DTX_STATE_NOTIFYING_ABORT_PREPARED)
{
ereport(FATAL,
(errmsg("Unable to complete the 'Abort Prepared' broadcast"),
TM_ERRDETAIL));
}
Assert(MyTmGxactLocal->state == DTX_STATE_NOTIFYING_ABORT_NO_PREPARED);
/*
* By deallocating the gang, we will force a new gang to connect to
* all the segment instances. And, we will abort the transactions in
* the segments.
*/
ResetAllGangs();
return;
}
doNotifyingAbort();
return;
}
/* get tm share memory size */
int
tmShmemSize(void)
{
return MAXALIGN(TMCONTROLBLOCK_BYTES(max_tm_gxacts));
}
/*
* tmShmemInit - should be called only once from postmaster and inherit by all
* postgres processes
*/
void
tmShmemInit(void)
{
bool found;
TmControlBlock *shared;
if (Gp_role == GP_ROLE_DISPATCH && max_prepared_xacts < MaxConnections)
elog(WARNING, "Better set max_prepared_transactions greater than max_connections");
/*
* max_prepared_transactions is a guc which is postmaster-startup setable
* -- it can only be updated by restarting the system. Global transactions
* will all use two-phase commit, so the number of global transactions is
* bound to the number of prepared.
*
* Note on master, it is possible that some prepared xacts just use partial
* gang so on QD the total prepared xacts might be quite large but it is
* limited by max_connections since one QD should only have one 2pc one
* time, so if we set max_tm_gxacts as max_prepared_transactions as before,
* shmCommittedGxactArray might not be able to accommodate committed but
* not forgotten transactions (standby recovery will fail if encountering
* this issue) if max_prepared_transactions is smaller than max_connections
* (though this is not suggested). Not to mention that
* max_prepared_transactions might be inconsistent between master/standby
* and segments (though this is not suggested).
*
* We can assign MaxBackends (MaxConnections should be fine also but let's
* be conservative) to max_tm_gxacts on master/standby to tolerate various
* configuration combinations of max_prepared_transactions and
* max_connections. max_tm_gxacts is used on the coordinator only, and the
* coordinator might be accessed in dispatch mode or utility mode.
*/
if (IS_QUERY_DISPATCHER())
max_tm_gxacts = MaxBackends;
else
max_tm_gxacts = 0;
shared = (TmControlBlock *) ShmemInitStruct("Transaction manager", tmShmemSize(), &found);
if (!shared)
elog(FATAL, "could not initialize transaction manager share memory");
/* Only initialize this if we are the creator of the shared memory */
if (!found)
{
ShmemVariableCache->latestCompletedGxid = InvalidDistributedTransactionId;
SpinLockInit(&shared->DtxRecoveryEventLock);
SpinLockInit(&shared->gxidGenLock);
}
shmDtmStarted = &shared->DtmStarted;
shmCleanupBackends = &shared->CleanupBackends;
shmDtxRecoveryPid = &shared->DtxRecoveryPid;
shmDtxRecoveryEvents = &shared->DtxRecoveryEvents;
shmDtxRecoveryEventLock = &shared->DtxRecoveryEventLock;
shmNextSnapshotId = &shared->NextSnapshotId;
shmGxidGenLock = &shared->gxidGenLock;
shmNumCommittedGxacts = &shared->num_committed_xacts;
shmCommittedGxidArray = &shared->committed_gxid_array[0];
if (!IsUnderPostmaster)
/* Initialize locks and shared memory area */
{
*shmNextSnapshotId = 0;
*shmDtmStarted = false;
*shmCleanupBackends = false;
*shmDtxRecoveryPid = 0;
*shmDtxRecoveryEvents = DTX_RECOVERY_EVENT_ABORT_PREPARED;
*shmNumCommittedGxacts = 0;
}
}
/* mppTxnOptions:
* Generates an int containing the appropriate flags to direct the remote
* segdb QE process to perform any needed transaction commands before or
* after the statement.
*/
int
mppTxnOptions(bool needDtx)
{
int options = 0;
elog(DTM_DEBUG5,
"mppTxnOptions DefaultXactIsoLevel = %s, DefaultXactReadOnly = %s, XactIsoLevel = %s, XactReadOnly = %s.",
IsoLevelAsUpperString(DefaultXactIsoLevel), (DefaultXactReadOnly ? "true" : "false"),
IsoLevelAsUpperString(XactIsoLevel), (XactReadOnly ? "true" : "false"));
if (needDtx)
options |= GP_OPT_NEED_DTX;
if (XactIsoLevel == XACT_READ_COMMITTED)
options |= GP_OPT_READ_COMMITTED;
else if (XactIsoLevel == XACT_REPEATABLE_READ)
options |= GP_OPT_REPEATABLE_READ;
else if (XactIsoLevel == XACT_SERIALIZABLE)
options |= GP_OPT_SERIALIZABLE;
else if (XactIsoLevel == XACT_READ_UNCOMMITTED)
options |= GP_OPT_READ_UNCOMMITTED;
if (XactReadOnly)
options |= GP_OPT_READ_ONLY;
if (isCurrentDtxActivated() && MyTmGxactLocal->explicitBeginRemembered)
options |= GP_OPT_EXPLICT_BEGIN;
elog(DTM_DEBUG5,
"mppTxnOptions txnOptions = 0x%x, needDtx = %s, explicitBegin = %s, isoLevel = %s, readOnly = %s.",
options,
(isMppTxOptions_NeedDtx(options) ? "true" : "false"), (isMppTxOptions_ExplicitBegin(options) ? "true" : "false"),
IsoLevelAsUpperString(mppTxOptions_IsoLevel(options)), (isMppTxOptions_ReadOnly(options) ? "true" : "false"));
return options;
}
int
mppTxOptions_IsoLevel(int txnOptions)
{
if ((txnOptions & GP_OPT_ISOLATION_LEVEL_MASK) == GP_OPT_SERIALIZABLE)
return XACT_SERIALIZABLE;
else if ((txnOptions & GP_OPT_ISOLATION_LEVEL_MASK) == GP_OPT_REPEATABLE_READ)
return XACT_REPEATABLE_READ;
else if ((txnOptions & GP_OPT_ISOLATION_LEVEL_MASK) == GP_OPT_READ_COMMITTED)
return XACT_READ_COMMITTED;
else if ((txnOptions & GP_OPT_ISOLATION_LEVEL_MASK) == GP_OPT_READ_UNCOMMITTED)
return XACT_READ_UNCOMMITTED;
/* QD must set transaction isolation level */
elog(ERROR, "transaction options from QD did not include isolation level");
}
bool
isMppTxOptions_ReadOnly(int txnOptions)
{
return ((txnOptions & GP_OPT_READ_ONLY) != 0);
}
bool
isMppTxOptions_NeedDtx(int txnOptions)
{
return ((txnOptions & GP_OPT_NEED_DTX) != 0);
}
/* isMppTxOptions_ExplicitBegin:
* Return the ExplicitBegin flag.
*/
bool
isMppTxOptions_ExplicitBegin(int txnOptions)
{
return ((txnOptions & GP_OPT_EXPLICT_BEGIN) != 0);
}
/*=========================================================================
* HELPER FUNCTIONS
*/
static int
compare_int(const void *va, const void *vb)
{
int a = *((const int *) va);
int b = *((const int *) vb);
if (a == b)
return 0;
return (a > b) ? 1 : -1;
}
bool
currentDtxDispatchProtocolCommand(DtxProtocolCommand dtxProtocolCommand, bool raiseError)
{
char gid[TMGIDSIZE];
dtxFormGid(gid, getDistributedTransactionId());
return doDispatchDtxProtocolCommand(dtxProtocolCommand, gid, raiseError,
MyTmGxactLocal->dtxSegments, NULL, 0);
}
bool
doDispatchDtxProtocolCommand(DtxProtocolCommand dtxProtocolCommand,
char *gid,
bool raiseError,
List *dtxSegments,
char *serializedDtxContextInfo,
int serializedDtxContextInfoLen)
{
int i,
resultCount,
numOfFailed = 0;
char *dtxProtocolCommandStr = 0;
struct pg_result **results;
MemoryContext oldContext;
int *waitGxids = NULL;
int totalWaits = 0;
if (!dtxSegments)
return true;
dtxProtocolCommandStr = DtxProtocolCommandToString(dtxProtocolCommand);
if (Test_print_direct_dispatch_info)
elog(INFO, "Distributed transaction command '%s' to %s",
dtxProtocolCommandStr,
segmentsToContentStr(dtxSegments));
ereport(DTM_DEBUG5,
(errmsg("dispatchDtxProtocolCommand: %d ('%s'), direct content #: %s",
dtxProtocolCommand, dtxProtocolCommandStr,
segmentsToContentStr(dtxSegments))));
ErrorData *qeError;
results = CdbDispatchDtxProtocolCommand(dtxProtocolCommand,
dtxProtocolCommandStr,
gid,
&qeError, &resultCount, dtxSegments,
serializedDtxContextInfo, serializedDtxContextInfoLen);
if (qeError)
{
if (!raiseError)
{
ereport(LOG,
(errmsg("DTM error (gathered results from cmd '%s')", dtxProtocolCommandStr),
errdetail("QE reported error: %s", qeError->message)));
}
else
{
FlushErrorState();
ReThrowError(qeError);
}
return false;
}
if (results == NULL)
{
numOfFailed++; /* If we got no results, we need to treat it
* as an error! */
}
for (i = 0; i < resultCount; i++)
{
char *cmdStatus;
ExecStatusType resultStatus;
/*
* note: PQresultStatus() is smart enough to deal with results[i] ==
* NULL
*/
resultStatus = PQresultStatus(results[i]);
if (resultStatus != PGRES_COMMAND_OK &&
resultStatus != PGRES_TUPLES_OK)
{
numOfFailed++;
}
else
{
/*
* success ? If an error happened during a transaction which
* hasn't already been caught when we try a prepare we'll get a
* rollback from our prepare ON ONE SEGMENT: so we go look at the
* status, otherwise we could issue a COMMIT when we don't want
* to!
*/
cmdStatus = PQcmdStatus(results[i]);
elog(DEBUG3, "DTM: status message cmd '%s' [%d] result '%s'", dtxProtocolCommandStr, i, cmdStatus);
if (strncmp(cmdStatus, dtxProtocolCommandStr, strlen(cmdStatus)) != 0)
{
/* failed */
numOfFailed++;
}
}
}
/* gather all the waited gxids from segments and remove the duplicates */
for (i = 0; i < resultCount; i++)
totalWaits += results[i]->nWaits;
if (totalWaits > 0)
waitGxids = palloc(sizeof(int) * totalWaits);
totalWaits = 0;
for (i = 0; i < resultCount; i++)
{
struct pg_result *result = results[i];
if (result->nWaits > 0)
{
memcpy(&waitGxids[totalWaits], result->waitGxids, sizeof(int) * result->nWaits);
totalWaits += result->nWaits;
}
PQclear(result);
}
if (totalWaits > 0)
{
int lastRepeat = -1;
if (MyTmGxactLocal->waitGxids)
{
list_free(MyTmGxactLocal->waitGxids);
MyTmGxactLocal->waitGxids = NULL;
}
qsort(waitGxids, totalWaits, sizeof(int), compare_int);
oldContext = MemoryContextSwitchTo(TopTransactionContext);
for (i = 0; i < totalWaits; i++)
{
if (waitGxids[i] == lastRepeat)
continue;
MyTmGxactLocal->waitGxids = lappend_int(MyTmGxactLocal->waitGxids, waitGxids[i]);
lastRepeat = waitGxids[i];
}
MemoryContextSwitchTo(oldContext);
}
if (waitGxids)
pfree(waitGxids);
if (results)
pfree(results);
return (numOfFailed == 0);
}
bool
dispatchDtxCommand(const char *cmd)
{
int i,
numOfFailed = 0;
CdbPgResults cdb_pgresults = {NULL, 0};
elog(DTM_DEBUG5, "dispatchDtxCommand: '%s'", cmd);
if (currentGxactWriterGangLost())
{
ereport(WARNING,
(errmsg("writer gang of current global transaction is lost")));
return false;
}
CdbDispatchCommand(cmd, DF_NEED_TWO_PHASE, &cdb_pgresults);
if (cdb_pgresults.numResults == 0)
{
return false; /* If we got no results, we need to treat it
* as an error! */
}
for (i = 0; i < cdb_pgresults.numResults; i++)
{
char *cmdStatus;
ExecStatusType resultStatus;
/*
* note: PQresultStatus() is smart enough to deal with results[i] ==
* NULL
*/
resultStatus = PQresultStatus(cdb_pgresults.pg_results[i]);
if (resultStatus != PGRES_COMMAND_OK &&
resultStatus != PGRES_TUPLES_OK)
{
numOfFailed++;
}
else
{
/*
* success ? If an error happened during a transaction which
* hasn't already been caught when we try a prepare we'll get a
* rollback from our prepare ON ONE SEGMENT: so we go look at the
* status, otherwise we could issue a COMMIT when we don't want
* to!
*/
cmdStatus = PQcmdStatus(cdb_pgresults.pg_results[i]);
elog(DEBUG3, "DTM: status message cmd '%s' [%d] result '%s'", cmd, i, cmdStatus);
if (strncmp(cmdStatus, cmd, strlen(cmdStatus)) != 0)
{
/* failed */
numOfFailed++;
}
}
}
cdbdisp_clearCdbPgResults(&cdb_pgresults);
return (numOfFailed == 0);
}
/* reset global transaction context */
void
resetTmGxact(void)
{
Assert(MyTmGxact->gxid == InvalidDistributedTransactionId);
MyTmGxact->xminDistributedSnapshot = InvalidDistributedTransactionId;
MyTmGxact->includeInCkpt = false;
MyTmGxact->sessionId = 0;
MyTmGxactLocal->explicitBeginRemembered = false;
MyTmGxactLocal->writerGangLost = false;
MyTmGxactLocal->dtxSegmentsMap = NULL;
MyTmGxactLocal->dtxSegments = NIL;
MyTmGxactLocal->isOnePhaseCommit = false;
if (MyTmGxactLocal->waitGxids != NULL)
{
list_free(MyTmGxactLocal->waitGxids);
MyTmGxactLocal->waitGxids = NULL;
}
setCurrentDtxState(DTX_STATE_NONE);
}
bool
getNextDistributedXactStatus(TMGALLXACTSTATUS *allDistributedXactStatus, TMGXACTSTATUS **distributedXactStatus)
{
if (allDistributedXactStatus->next >= allDistributedXactStatus->count)
{
return false;
}
*distributedXactStatus = &allDistributedXactStatus->statusArray[allDistributedXactStatus->next];
allDistributedXactStatus->next++;
return true;
}
/*
* serializes commits with checkpoint info using PGPROC->inCommit
* Change state to DTX_STATE_INSERTING_COMMITTED.
*/
void
insertingDistributedCommitted(void)
{
elog(DTM_DEBUG5,
"insertingDistributedCommitted entering in state = %s",
DtxStateToString(MyTmGxactLocal->state));
Assert(MyTmGxactLocal->state == DTX_STATE_PREPARED);
setCurrentDtxState(DTX_STATE_INSERTING_COMMITTED);
}
/*
* Change state to DTX_STATE_INSERTED_COMMITTED.
*/
void
insertedDistributedCommitted(void)
{
SIMPLE_FAULT_INJECTOR("start_insertedDistributedCommitted");
ereport(DTM_DEBUG5,
(errmsg("entering insertedDistributedCommitted"),
TM_ERRDETAIL));
Assert(MyTmGxactLocal->state == DTX_STATE_INSERTING_COMMITTED);
setCurrentDtxState(DTX_STATE_INSERTED_COMMITTED);
/*
* We don't have to hold ProcArrayLock here because needIncludedInCkpt is used
* during creating checkpoint and we already set delayChkpt before we got here.
*/
Assert(MyPgXact->delayChkpt);
if (IS_QUERY_DISPATCHER())
MyTmGxact->includeInCkpt = true;
}
/*
* When called, a SET command is dispatched and the writer gang
* writes the shared snapshot. This function actually does nothing
* useful besides making sure that a writer gang is alive and has
* set the shared snapshot so that the readers could access it.
*
* At this point this function is added as a helper for cursor
* query execution since in MPP cursor queries don't use writer
* gangs. However, it could be used for other purposes as well.
*
* See declaration of assign_gp_write_shared_snapshot(...) for more
* information.
*/
void
verify_shared_snapshot_ready(int cid)
{
Assert (Gp_role == GP_ROLE_DISPATCH);
/*
* A cursor/bind/exec command may trigger multiple dispatchs (e.g.
* DECLARE s1 CURSOR FOR SELECT * FROM test WHERE a=(SELECT max(b) FROM test))
* and all the dispatchs target to the reader gangs only. Since all the dispatchs
* are read-only and happens in one user command, it's ok to share one same snapshot.
*/
if (MySessionState->latestCursorCommandId == cid)
return;
CdbDispatchCommand("set gp_write_shared_snapshot=true",
DF_CANCEL_ON_ERROR |
DF_WITH_SNAPSHOT |
DF_NEED_TWO_PHASE,
NULL);
dumpSharedLocalSnapshot_forCursor();
MySessionState->latestCursorCommandId = cid;
}
/*
* Force the writer QE to write the shared snapshot. Will get called
* after a "set gp_write_shared_snapshot=<true/false>" is executed
* in dispatch mode.
*
* See verify_shared_snapshot_ready(...) for additional information.
*/
void
assign_gp_write_shared_snapshot(bool newval, void *extra)
{
#if FALSE
elog(DEBUG1, "SET gp_write_shared_snapshot: %s",
(newval ? "true" : "false"));
#endif
/*
* Make sure newval is "true". if it's "false" this could be a part of a
* ROLLBACK so we don't want to set the snapshot then.
*/
if (newval)
{
if (Gp_role == GP_ROLE_EXECUTE)
{
PushActiveSnapshot(GetTransactionSnapshot());
if (Gp_is_writer)
{
dumpSharedLocalSnapshot_forCursor();
}
PopActiveSnapshot();
}
}
}
static void
doQEDistributedExplicitBegin()
{
/*
* Start a command.
*/
StartTransactionCommand();
/* Here is the explicit BEGIN. */
BeginTransactionBlock();
/*
* Finish the BEGIN command. It will leave the explict transaction
* in-progress.
*/
CommitTransactionCommand();
}
static bool
isDtxQueryDispatcher(void)
{
bool isDtmStarted;
bool isSharedLocalSnapshotSlotPresent;
isDtmStarted = (shmDtmStarted != NULL && *shmDtmStarted);
isSharedLocalSnapshotSlotPresent = (SharedLocalSnapshotSlot != NULL);
return (Gp_role == GP_ROLE_DISPATCH &&
isDtmStarted &&
isSharedLocalSnapshotSlotPresent);
}
/*
* Called prior to handling a requested that comes to the QD, or a utility request to a QE.
*
* Sets up the distributed transaction context value and does some basic error checking.
*
* Essentially:
* if the DistributedTransactionContext is already QD_DISTRIBUTED_CAPABLE then leave it
* else if the DistributedTransactionContext is already QE_TWO_PHASE_EXPLICIT_WRITER then leave it
* else it MUST be a LOCAL_ONLY, and is converted to QD_DISTRIBUTED_CAPABLE if this process is acting
* as a QE.
*/
void
setupRegularDtxContext(void)
{
switch (DistributedTransactionContext)
{
case DTX_CONTEXT_QD_DISTRIBUTED_CAPABLE:
/* Continue in this context. Do not touch QEDtxContextInfo, etc. */
break;
case DTX_CONTEXT_QE_TWO_PHASE_EXPLICIT_WRITER:
/* Allow this for copy...??? Do not touch QEDtxContextInfo, etc. */
break;
default:
if (DistributedTransactionContext != DTX_CONTEXT_LOCAL_ONLY)
{
/*
* we must be one of:
*
* DTX_CONTEXT_QD_RETRY_PHASE_2,
* DTX_CONTEXT_QE_ENTRY_DB_SINGLETON,
* DTX_CONTEXT_QE_AUTO_COMMIT_IMPLICIT,
* DTX_CONTEXT_QE_TWO_PHASE_IMPLICIT_WRITER,
* DTX_CONTEXT_QE_READER, DTX_CONTEXT_QE_PREPARED
*/
elog(ERROR, "setupRegularDtxContext finds unexpected DistributedTransactionContext = '%s'",
DtxContextToString(DistributedTransactionContext));
}
/* DistributedTransactionContext is DTX_CONTEXT_LOCAL_ONLY */
Assert(QEDtxContextInfo.distributedXid == InvalidDistributedTransactionId);
/*
* Determine if we are strictly local or a distributed capable QD.
*/
Assert(DistributedTransactionContext == DTX_CONTEXT_LOCAL_ONLY);
if (isDtxQueryDispatcher())
setDistributedTransactionContext(DTX_CONTEXT_QD_DISTRIBUTED_CAPABLE);
break;
}
elog(DTM_DEBUG5, "setupRegularDtxContext leaving with DistributedTransactionContext = '%s'.",
DtxContextToString(DistributedTransactionContext));
}
/**
* Called on the QE when a query to process has been received.
*
* This will set up all distributed transaction information and set the state appropriately.
*/
void
setupQEDtxContext(DtxContextInfo *dtxContextInfo)
{
DistributedSnapshot *distributedSnapshot;
int txnOptions;
bool needDtx;
bool explicitBegin;
bool haveDistributedSnapshot;
bool isEntryDbSingleton = false;
bool isReaderQE = false;
bool isWriterQE = false;
bool isSharedLocalSnapshotSlotPresent;
Assert(dtxContextInfo != NULL);
/*
* DTX Context Info (even when empty) only comes in QE requests.
*/
distributedSnapshot = &dtxContextInfo->distributedSnapshot;
txnOptions = dtxContextInfo->distributedTxnOptions;
needDtx = isMppTxOptions_NeedDtx(txnOptions);
explicitBegin = isMppTxOptions_ExplicitBegin(txnOptions);
haveDistributedSnapshot = dtxContextInfo->haveDistributedSnapshot;
isSharedLocalSnapshotSlotPresent = (SharedLocalSnapshotSlot != NULL);
if (DEBUG5 >= log_min_messages || Debug_print_full_dtm)
{
elog(DTM_DEBUG5,
"setupQEDtxContext inputs (part 1): Gp_role = %s, Gp_is_writer = %s, "
"txnOptions = 0x%x, needDtx = %s, explicitBegin = %s, isoLevel = %s, readOnly = %s, haveDistributedSnapshot = %s.",
role_to_string(Gp_role), (Gp_is_writer ? "true" : "false"), txnOptions,
(needDtx ? "true" : "false"), (explicitBegin ? "true" : "false"),
IsoLevelAsUpperString(mppTxOptions_IsoLevel(txnOptions)), (isMppTxOptions_ReadOnly(txnOptions) ? "true" : "false"),
(haveDistributedSnapshot ? "true" : "false"));
elog(DTM_DEBUG5,
"setupQEDtxContext inputs (part 2): distributedXid = "UINT64_FORMAT", isSharedLocalSnapshotSlotPresent = %s.",
dtxContextInfo->distributedXid,
(isSharedLocalSnapshotSlotPresent ? "true" : "false"));
if (haveDistributedSnapshot)
{
elog(DTM_DEBUG5,
"setupQEDtxContext inputs (part 2a): distributedXid = "UINT64_FORMAT", "
"distributedSnapshotData (xmin = "UINT64_FORMAT", xmax = "UINT64_FORMAT", xcnt = %u), distributedCommandId = %d",
dtxContextInfo->distributedXid,
distributedSnapshot->xmin, distributedSnapshot->xmax,
distributedSnapshot->count,
dtxContextInfo->curcid);
}
if (isSharedLocalSnapshotSlotPresent)
{
if (DTM_DEBUG5 >= log_min_messages)
{
LWLockAcquire(SharedLocalSnapshotSlot->slotLock, LW_SHARED);
elog(DTM_DEBUG5,
"setupQEDtxContext inputs (part 2b): shared local snapshot xid = " UINT64_FORMAT " "
"(xmin: %u xmax: %u xcnt: %u) curcid: %d, QDxid = "UINT64_FORMAT"/%u",
U64FromFullTransactionId(SharedLocalSnapshotSlot->fullXid),
SharedLocalSnapshotSlot->snapshot.xmin,
SharedLocalSnapshotSlot->snapshot.xmax,
SharedLocalSnapshotSlot->snapshot.xcnt,
SharedLocalSnapshotSlot->snapshot.curcid,
SharedLocalSnapshotSlot->distributedXid,
SharedLocalSnapshotSlot->segmateSync);
LWLockRelease(SharedLocalSnapshotSlot->slotLock);
}
}
}
switch (Gp_role)
{
case GP_ROLE_EXECUTE:
if (IS_QUERY_DISPATCHER() && !Gp_is_writer)
{
isEntryDbSingleton = true;
}
else
{
/*
* NOTE: this is a bit hackish. It appears as though
* StartTransaction() gets called during connection setup
* before we even have time to setup our shared snapshot slot.
*/
if (SharedLocalSnapshotSlot == NULL)
{
if (explicitBegin || haveDistributedSnapshot)
{
elog(ERROR, "setupQEDtxContext not expecting distributed begin or snapshot when no Snapshot slot exists");
}
}
else
{
if (Gp_is_writer)
{
isWriterQE = true;
}
else
{
isReaderQE = true;
}
}
}
break;
default:
Assert(DistributedTransactionContext == DTX_CONTEXT_LOCAL_ONLY);
elog(DTM_DEBUG5,
"setupQEDtxContext leaving context = 'Local Only' for Gp_role = %s", role_to_string(Gp_role));
return;
}
elog(DTM_DEBUG5,
"setupQEDtxContext intermediate result: isEntryDbSingleton = %s, isWriterQE = %s, isReaderQE = %s.",
(isEntryDbSingleton ? "true" : "false"),
(isWriterQE ? "true" : "false"), (isReaderQE ? "true" : "false"));
/*
* Copy to our QE global variable.
*/
DtxContextInfo_Copy(&QEDtxContextInfo, dtxContextInfo);
switch (DistributedTransactionContext)
{
case DTX_CONTEXT_LOCAL_ONLY:
if (isEntryDbSingleton && haveDistributedSnapshot)
{
/*
* Later, in GetSnapshotData, we will adopt the QD's
* transaction and snapshot information.
*/
setDistributedTransactionContext(DTX_CONTEXT_QE_ENTRY_DB_SINGLETON);
}
else if (isReaderQE && haveDistributedSnapshot)
{
/*
* Later, in GetSnapshotData, we will adopt the QE Writer's
* transaction and snapshot information.
*/
setDistributedTransactionContext(DTX_CONTEXT_QE_READER);
}
else if (isWriterQE && (explicitBegin || needDtx))
{
if (!haveDistributedSnapshot)
{
elog(DTM_DEBUG5,
"setupQEDtxContext Segment Writer is involved in a distributed transaction without a distributed snapshot...");
}
if (IsTransactionOrTransactionBlock())
{
elog(ERROR, "Starting an explicit distributed transaction in segment -- cannot already be in a transaction");
}
if (explicitBegin)
{
/*
* We set the DistributedTransactionContext BEFORE we
* create the transactions to influence the behavior of
* StartTransaction.
*/
setDistributedTransactionContext(DTX_CONTEXT_QE_TWO_PHASE_EXPLICIT_WRITER);
doQEDistributedExplicitBegin();
}
else
setDistributedTransactionContext(DTX_CONTEXT_QE_TWO_PHASE_IMPLICIT_WRITER);
}
else if (haveDistributedSnapshot)
{
if (IsTransactionOrTransactionBlock())
{
elog(ERROR,
"Going to start a local implicit transaction in segment using a distribute "
"snapshot -- cannot already be in a transaction");
}
/*
* Before executing the query, postgres.c make a standard call
* to StartTransactionCommand which will begin a local
* transaction with StartTransaction. This is fine.
*
* However, when the snapshot is created later, the state
* below will tell GetSnapshotData to make the local snapshot
* from the distributed snapshot.
*/
setDistributedTransactionContext(DTX_CONTEXT_QE_AUTO_COMMIT_IMPLICIT);
}
else
{
Assert(!haveDistributedSnapshot);
/*
* A local implicit transaction without reference to a
* distributed snapshot. Stay in NONE state.
*/
Assert(DistributedTransactionContext == DTX_CONTEXT_LOCAL_ONLY);
}
break;
case DTX_CONTEXT_QE_TWO_PHASE_IMPLICIT_WRITER:
/*
elog(NOTICE, "We should have left this transition state '%s' at the end of the previous command...",
DtxContextToString(DistributedTransactionContext));
*/
Assert(IsTransactionOrTransactionBlock());
if (explicitBegin)
{
elog(ERROR, "Cannot have an explicit BEGIN statement...");
}
break;
case DTX_CONTEXT_QE_AUTO_COMMIT_IMPLICIT:
elog(ERROR, "We should have left this transition state '%s' at the end of the previous command",
DtxContextToString(DistributedTransactionContext));
break;
case DTX_CONTEXT_QE_TWO_PHASE_EXPLICIT_WRITER:
Assert(IsTransactionOrTransactionBlock());
break;
case DTX_CONTEXT_QE_ENTRY_DB_SINGLETON:
case DTX_CONTEXT_QE_READER:
/*
* We are playing games with the xact.c code, so we shouldn't test
* with the IsTransactionOrTransactionBlock() routine.
*/
break;
case DTX_CONTEXT_QE_PREPARED:
case DTX_CONTEXT_QE_FINISH_PREPARED:
elog(ERROR, "We should not be trying to execute a query in state '%s'",
DtxContextToString(DistributedTransactionContext));
break;
default:
elog(PANIC, "Unexpected segment distribute transaction context value: %d",
(int) DistributedTransactionContext);
break;
}
elog(DTM_DEBUG5, "setupQEDtxContext final result: DistributedTransactionContext = '%s'.",
DtxContextToString(DistributedTransactionContext));
if (haveDistributedSnapshot)
{
elog((Debug_print_snapshot_dtm ? LOG : DEBUG5), "[Distributed Snapshot #%u] *Set QE* currcid = %d (gxid = "UINT64_FORMAT", '%s')",
dtxContextInfo->distributedSnapshot.distribSnapshotId,
dtxContextInfo->curcid,
getDistributedTransactionId(),
DtxContextToString(DistributedTransactionContext));
}
}
void
finishDistributedTransactionContext(char *debugCaller, bool aborted)
{
DistributedTransactionId gxid;
/*
* We let the 2 retry states go up to PostgresMain.c, otherwise everything
* MUST be complete.
*/
if (isCurrentDtxActivated() &&
(MyTmGxactLocal->state != DTX_STATE_RETRY_COMMIT_PREPARED &&
MyTmGxactLocal->state != DTX_STATE_RETRY_ABORT_PREPARED))
{
ereport(FATAL,
(errmsg("Unexpected dtx status (caller = %s).", debugCaller),
TM_ERRDETAIL));
}
gxid = getDistributedTransactionId();
elog(DTM_DEBUG5,
"finishDistributedTransactionContext called to change DistributedTransactionContext from %s to %s (caller = %s, gxid = "UINT64_FORMAT")",
DtxContextToString(DistributedTransactionContext),
DtxContextToString(DTX_CONTEXT_LOCAL_ONLY),
debugCaller,
gxid);
setDistributedTransactionContext(DTX_CONTEXT_LOCAL_ONLY);
DtxContextInfo_Reset(&QEDtxContextInfo);
}
static void
rememberDtxExplicitBegin(void)
{
Assert (isCurrentDtxActivated());
if (!MyTmGxactLocal->explicitBeginRemembered)
{
ereport(DTM_DEBUG5,
(errmsg("rememberDtxExplicitBegin explicit BEGIN"),
TM_ERRDETAIL));
MyTmGxactLocal->explicitBeginRemembered = true;
}
else
{
ereport(DTM_DEBUG5,
(errmsg("rememberDtxExplicitBegin already an explicit BEGIN"),
TM_ERRDETAIL));
}
}
bool
isDtxExplicitBegin(void)
{
return (isCurrentDtxActivated() && MyTmGxactLocal->explicitBeginRemembered);
}
/*
* This is mostly here because
* cdbcopy doesn't use cdbdisp's services.
*/
void
sendDtxExplicitBegin(void)
{
if (Gp_role != GP_ROLE_DISPATCH)
return;
setupDtxTransaction();
rememberDtxExplicitBegin();
}
/**
* On the QD, run the Prepare operation.
*/
static void
performDtxProtocolPrepare(const char *gid)
{
StartTransactionCommand();
elog(DTM_DEBUG5, "performDtxProtocolCommand going to call PrepareTransactionBlock for distributed transaction (id = '%s')", gid);
if (!PrepareTransactionBlock((char *) gid))
{
elog(ERROR, "Prepare of distributed transaction %s failed", gid);
return;
}
/*
* Calling CommitTransactionCommand will cause the actual COMMIT/PREPARE
* work to be performed.
*/
CommitTransactionCommand();
elog(DTM_DEBUG5, "Prepare of distributed transaction succeeded (id = '%s')", gid);
setDistributedTransactionContext(DTX_CONTEXT_QE_PREPARED);
}
static void
sendWaitGxidsToQD(List *waitGxids)
{
ListCell *lc;
StringInfoData buf;
int len = list_length(waitGxids);
if (len == 0)
return;
pq_beginmessage(&buf, 'w');
pq_sendint(&buf, len, 4);
foreach(lc, waitGxids)
{
pq_sendint(&buf, lfirst_int(lc), 4);
}
pq_endmessage(&buf);
}
/**
* On the QE, run the Commit one-phase operation.
*/
static void
performDtxProtocolCommitOnePhase(const char *gid)
{
DistributedTransactionId gxid;
List *waitGxids = list_copy(MyTmGxactLocal->waitGxids);
SIMPLE_FAULT_INJECTOR("start_performDtxProtocolCommitOnePhase");
elog(DTM_DEBUG5,
"performDtxProtocolCommitOnePhase going to call CommitTransaction for distributed transaction %s", gid);
dtxDeformGid(gid, &gxid);
Assert(gxid == getDistributedTransactionId());
MyTmGxactLocal->isOnePhaseCommit = true;
StartTransactionCommand();
if (!EndTransactionBlock(false))
{
elog(ERROR, "One-phase Commit of distributed transaction %s failed", gid);
return;
}
/* Calling CommitTransactionCommand will cause the actual COMMIT work to be performed. */
CommitTransactionCommand();
finishDistributedTransactionContext("performDtxProtocolCommitOnePhase -- Commit onephase", false);
MyProc->localDistribXactData.state = LOCALDISTRIBXACT_STATE_NONE;
sendWaitGxidsToQD(waitGxids);
}
/**
* On the QE, run the Commit Prepared operation.
*/
static void
performDtxProtocolCommitPrepared(const char *gid, bool raiseErrorIfNotFound)
{
Assert(Gp_role == GP_ROLE_EXECUTE);
elog(DTM_DEBUG5,
"performDtxProtocolCommitPrepared going to call FinishPreparedTransaction for distributed transaction %s", gid);
List *waitGxids = list_copy(MyTmGxactLocal->waitGxids);
StartTransactionCommand();
/*
* Since this call may fail, lets setup a handler.
*/
PG_TRY();
{
FinishPreparedTransaction((char *) gid, /* isCommit */ true, raiseErrorIfNotFound);
}
PG_CATCH();
{
finishDistributedTransactionContext("performDtxProtocolCommitPrepared -- Commit Prepared (error case)", false);
PG_RE_THROW();
}
PG_END_TRY();
/*
* Calling CommitTransactionCommand will cause the actual COMMIT/PREPARE
* work to be performed.
*/
CommitTransactionCommand();
sendWaitGxidsToQD(waitGxids);
finishDistributedTransactionContext("performDtxProtocolCommitPrepared -- Commit Prepared", false);
}
/**
* On the QE, run the Abort Prepared operation.
*/
static void
performDtxProtocolAbortPrepared(const char *gid, bool raiseErrorIfNotFound)
{
Assert(Gp_role == GP_ROLE_EXECUTE);
elog(DTM_DEBUG5, "performDtxProtocolAbortPrepared going to call FinishPreparedTransaction for distributed transaction %s", gid);
StartTransactionCommand();
/*
* Since this call may fail, lets setup a handler.
*/
PG_TRY();
{
FinishPreparedTransaction((char *) gid, /* isCommit */ false, raiseErrorIfNotFound);
}
PG_CATCH();
{
finishDistributedTransactionContext("performDtxProtocolAbortPrepared -- Abort Prepared (error case)", true);
PG_RE_THROW();
}
PG_END_TRY();
/*
* Calling CommitTransactionCommand will cause the actual COMMIT/PREPARE
* work to be performed.
*/
CommitTransactionCommand();
finishDistributedTransactionContext("performDtxProtocolAbortPrepared -- Abort Prepared", true);
}
/**
* On the QE, handle a DtxProtocolCommand
*/
void
performDtxProtocolCommand(DtxProtocolCommand dtxProtocolCommand,
const char *gid,
DtxContextInfo *contextInfo)
{
elog(DTM_DEBUG5,
"performDtxProtocolCommand called with DTX protocol = %s, segment distribute transaction context: '%s'",
DtxProtocolCommandToString(dtxProtocolCommand), DtxContextToString(DistributedTransactionContext));
switch (dtxProtocolCommand)
{
case DTX_PROTOCOL_COMMAND_ABORT_NO_PREPARED:
elog(DTM_DEBUG5,
"performDtxProtocolCommand going to call AbortOutOfAnyTransaction for distributed transaction %s", gid);
AbortOutOfAnyTransaction();
break;
case DTX_PROTOCOL_COMMAND_PREPARE:
case DTX_PROTOCOL_COMMAND_COMMIT_ONEPHASE:
/*
* The QD has directed us to read-only commit or prepare an
* implicit or explicit distributed transaction.
*/
switch (DistributedTransactionContext)
{
case DTX_CONTEXT_LOCAL_ONLY:
/*
* Spontaneously aborted while we were back at the QD?
*/
elog(ERROR, "Distributed transaction %s not found", gid);
break;
case DTX_CONTEXT_QE_TWO_PHASE_EXPLICIT_WRITER:
case DTX_CONTEXT_QE_TWO_PHASE_IMPLICIT_WRITER:
if (dtxProtocolCommand == DTX_PROTOCOL_COMMAND_COMMIT_ONEPHASE)
performDtxProtocolCommitOnePhase(gid);
else
performDtxProtocolPrepare(gid);
break;
case DTX_CONTEXT_QD_DISTRIBUTED_CAPABLE:
case DTX_CONTEXT_QD_RETRY_PHASE_2:
case DTX_CONTEXT_QE_PREPARED:
case DTX_CONTEXT_QE_FINISH_PREPARED:
case DTX_CONTEXT_QE_ENTRY_DB_SINGLETON:
case DTX_CONTEXT_QE_READER:
elog(FATAL, "Unexpected segment distribute transaction context: '%s'",
DtxContextToString(DistributedTransactionContext));
break;
default:
elog(PANIC, "Unexpected segment distribute transaction context value: %d",
(int) DistributedTransactionContext);
break;
}
break;
case DTX_PROTOCOL_COMMAND_ABORT_SOME_PREPARED:
switch (DistributedTransactionContext)
{
case DTX_CONTEXT_LOCAL_ONLY:
/*
* Spontaneously aborted while we were back at the QD?
*
* It's normal if the transaction doesn't exist. The QD will
* call abort on us, even if we didn't finish the prepare yet,
* if some other QE reported failure already.
*/
elog(DTM_DEBUG3, "Distributed transaction %s not found during abort", gid);
AbortOutOfAnyTransaction();
break;
case DTX_CONTEXT_QE_TWO_PHASE_EXPLICIT_WRITER:
case DTX_CONTEXT_QE_TWO_PHASE_IMPLICIT_WRITER:
AbortOutOfAnyTransaction();
break;
case DTX_CONTEXT_QE_PREPARED:
setDistributedTransactionContext(DTX_CONTEXT_QE_FINISH_PREPARED);
performDtxProtocolAbortPrepared(gid, /* raiseErrorIfNotFound */ true);
break;
case DTX_CONTEXT_QD_DISTRIBUTED_CAPABLE:
case DTX_CONTEXT_QD_RETRY_PHASE_2:
case DTX_CONTEXT_QE_ENTRY_DB_SINGLETON:
case DTX_CONTEXT_QE_READER:
elog(PANIC, "Unexpected segment distribute transaction context: '%s'",
DtxContextToString(DistributedTransactionContext));
break;
default:
elog(PANIC, "Unexpected segment distribute transaction context value: %d",
(int) DistributedTransactionContext);
break;
}
break;
case DTX_PROTOCOL_COMMAND_COMMIT_PREPARED:
requireDistributedTransactionContext(DTX_CONTEXT_QE_PREPARED);
setDistributedTransactionContext(DTX_CONTEXT_QE_FINISH_PREPARED);
performDtxProtocolCommitPrepared(gid, /* raiseErrorIfNotFound */ true);
break;
case DTX_PROTOCOL_COMMAND_ABORT_PREPARED:
requireDistributedTransactionContext(DTX_CONTEXT_QE_PREPARED);
setDistributedTransactionContext(DTX_CONTEXT_QE_FINISH_PREPARED);
performDtxProtocolAbortPrepared(gid, /* raiseErrorIfNotFound */ true);
break;
case DTX_PROTOCOL_COMMAND_RETRY_COMMIT_PREPARED:
requireDistributedTransactionContext(DTX_CONTEXT_LOCAL_ONLY);
performDtxProtocolCommitPrepared(gid, /* raiseErrorIfNotFound */ false);
break;
case DTX_PROTOCOL_COMMAND_RETRY_ABORT_PREPARED:
requireDistributedTransactionContext(DTX_CONTEXT_LOCAL_ONLY);
performDtxProtocolAbortPrepared(gid, /* raiseErrorIfNotFound */ false);
break;
case DTX_PROTOCOL_COMMAND_RECOVERY_COMMIT_PREPARED:
requireDistributedTransactionContext(DTX_CONTEXT_LOCAL_ONLY);
performDtxProtocolCommitPrepared(gid, /* raiseErrorIfNotFound */ false);
break;
case DTX_PROTOCOL_COMMAND_RECOVERY_ABORT_PREPARED:
requireDistributedTransactionContext(DTX_CONTEXT_LOCAL_ONLY);
performDtxProtocolAbortPrepared(gid, /* raiseErrorIfNotFound */ false);
break;
case DTX_PROTOCOL_COMMAND_SUBTRANSACTION_BEGIN_INTERNAL:
switch (DistributedTransactionContext)
{
case DTX_CONTEXT_LOCAL_ONLY:
/*
* QE is not aware of DTX yet. A typical case is SELECT
* foo(), where foo() opens internal subtransaction
*/
setupQEDtxContext(contextInfo);
StartTransactionCommand();
break;
case DTX_CONTEXT_QE_TWO_PHASE_IMPLICIT_WRITER:
/*
* We already marked this QE to be writer, and transaction
* is open.
*/
case DTX_CONTEXT_QE_TWO_PHASE_EXPLICIT_WRITER:
case DTX_CONTEXT_QE_READER:
break;
default:
/* Lets flag this situation out, with explicit crash */
Assert(false);
elog(DTM_DEBUG5,
" SUBTRANSACTION_BEGIN_INTERNAL distributed transaction context invalid: %d",
(int) DistributedTransactionContext);
break;
}
BeginInternalSubTransaction(NULL);
Assert(contextInfo->nestingLevel + 1 == GetCurrentTransactionNestLevel());
break;
case DTX_PROTOCOL_COMMAND_SUBTRANSACTION_RELEASE_INTERNAL:
Assert(contextInfo->nestingLevel == GetCurrentTransactionNestLevel());
ReleaseCurrentSubTransaction();
break;
case DTX_PROTOCOL_COMMAND_SUBTRANSACTION_ROLLBACK_INTERNAL:
/*
* Rollback performs work on master and then dispatches, hence has
* nestingLevel its expecting post operation
*/
if ((contextInfo->nestingLevel + 1) > GetCurrentTransactionNestLevel())
{
ereport(ERROR,
(errmsg("transaction %s at level %d already processed (current level %d)",
gid, contextInfo->nestingLevel, GetCurrentTransactionNestLevel())));
}
unsigned int i = GetCurrentTransactionNestLevel() - contextInfo->nestingLevel;
while (i > 0)
{
RollbackAndReleaseCurrentSubTransaction();
i--;
}
Assert(contextInfo->nestingLevel == GetCurrentTransactionNestLevel());
break;
default:
elog(ERROR, "Unrecognized dtx protocol command: %d",
(int) dtxProtocolCommand);
break;
}
elog(DTM_DEBUG5, "performDtxProtocolCommand successful return for distributed transaction %s", gid);
}
void
markCurrentGxactWriterGangLost(void)
{
MyTmGxactLocal->writerGangLost = true;
}
bool
currentGxactWriterGangLost(void)
{
return MyTmGxactLocal->writerGangLost;
}
/*
* Record which segment involved in the two phase commit.
*/
void
addToGxactDtxSegments(Gang *gang)
{
SegmentDatabaseDescriptor *segdbDesc;
MemoryContext oldContext;
int segindex;
int i;
if (!isCurrentDtxActivated())
return;
/* skip if all segdbs are in the list */
if (list_length(MyTmGxactLocal->dtxSegments) >= getgpsegmentCount())
return;
oldContext = MemoryContextSwitchTo(TopTransactionContext);
for (i = 0; i < gang->size; i++)
{
segdbDesc = gang->db_descriptors[i];
Assert(segdbDesc);
segindex = segdbDesc->segindex;
/* entry db is just a reader, will not involve in two phase commit */
if (segindex == -1)
continue;
/* skip if record already */
if (bms_is_member(segindex, MyTmGxactLocal->dtxSegmentsMap))
continue;
MyTmGxactLocal->dtxSegmentsMap =
bms_add_member(MyTmGxactLocal->dtxSegmentsMap, segindex);
MyTmGxactLocal->dtxSegments =
lappend_int(MyTmGxactLocal->dtxSegments, segindex);
}
MemoryContextSwitchTo(oldContext);
}
bool
CurrentDtxIsRollingback(void)
{
return (MyTmGxactLocal->state == DTX_STATE_NOTIFYING_ABORT_NO_PREPARED ||
MyTmGxactLocal->state == DTX_STATE_NOTIFYING_ABORT_SOME_PREPARED ||
MyTmGxactLocal->state == DTX_STATE_NOTIFYING_ABORT_PREPARED ||
MyTmGxactLocal->state == DTX_STATE_RETRY_ABORT_PREPARED);
}
Datum
gp_get_next_gxid(PG_FUNCTION_ARGS)
{
DistributedTransactionId next_gxid;
if (!superuser())
ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
(errmsg("Superuser only to execute it"))));
SpinLockAcquire(shmGxidGenLock);
next_gxid = ShmemVariableCache->nextGxid;
SpinLockRelease(shmGxidGenLock);
PG_RETURN_UINT64(next_gxid);
}
相关信息
相关文章
greenplumn cdbappendonlystorageformat 源码
greenplumn cdbappendonlystorageread 源码
greenplumn cdbappendonlystoragewrite 源码
greenplumn cdbappendonlyxlog 源码
greenplumn cdbbufferedappend 源码
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦