greenplumn cdbendpointretrieve 源码

  • 2022-08-18
  • 浏览 (398)

greenplumn cdbendpointretrieve 代码

文件路径:/src/backend/cdb/endpoint/cdbendpointretrieve.c

/*-------------------------------------------------------------------------
 * cdbendpointretrieve.c
 *
 * This file includes code of the RETRIEVE operation for the PARALLEL RETRIEVE
 * CURSOR, Once a PARALLEL RETRIEVE CURSOR is declared, the QE backends start
 * to send query results to so-called endpoints (see cdbendpoint.c). The
 * results can be retrieved through dedicated retrieve sessions in shared
 * memory via the shared-memory base message queue mechanism. A retrieve
 * session is a special session that can executes the RETRIEVE statement only.
 *
 * To start a retrieve session, the endpoint's token is needed as the password
 * for authentication. The token could be obtained via some endpoint related
 * UDFs. The user should be the same as the one who declares the parallel
 * retrieve cursor. The guc gp_retrieve_conn=true needs to be set to start the
 * retrieve session. The guc "gp_retrieve_conn=true" imples utility mode
 * connection.
 *
 * Once a retrieve session has attached to an endpoint, no other retrieve
 * session can attach to that endpoint.  It is possible to retrieve multiple
 * endpoints from the same retrieve session if they share the same token. In
 * other words, one retrieve session can attach and retrieve from multiple
 * endpoints.
 *
 * Copyright (c) 2020-Present VMware, Inc. or its affiliates
 *
 * IDENTIFICATION
 *		src/backend/cdb/cdbendpointretrieve.c
 *
 *-------------------------------------------------------------------------
 */

#include "postgres.h"

#include "access/session.h"
#include "access/xact.h"
#include "common/hashfn.h"
#include "storage/ipc.h"
#include "utils/backend_cancel.h"
#include "utils/dynahash.h"
#include "utils/elog.h"
#include "utils/faultinjector.h"
#include "utils/guc.h"
#include "cdbendpoint_private.h"
#include "cdb/cdbendpoint.h"
#include "cdb/cdbsrlz.h"

/* Is this the utility-mode backend for RETRIEVE? */
bool		am_cursor_retrieve_handler = false;

/*
 * Was the utility-mode connection retrieve connection authenticated? Just used
 * for sanity-checking.
 */
bool		retrieve_conn_authenticated = false;

/* Retrieve role state. */
enum RetrieveState
{
	RETRIEVE_STATE_INIT,
	RETRIEVE_STATE_ATTACHED,
	RETRIEVE_STATE_RECEIVING,
	RETRIEVE_STATE_FINISHED,
};

/*
 * For receiver, we have a hash table to store connected endpoint's shared
 * message queue. So that we can retrieve from different endpoints in the same
 * retriever and switch between different endpoints.
 *
 * For endpoint, only keep one entry to track current message queue.
 */
typedef struct RetrieveExecEntry
{
	/* The name of endpoint to be retrieved, also behave as hash key */
	char		endpointName[NAMEDATALEN];
	/* The endpoint to be retrieved */
	Endpoint	*endpoint;
	/* The dsm handle which contains shared memory message queue */
	dsm_segment *mqSeg;
	/* Shared memory message queue */
	shm_mq_handle *mqHandle;
	/* tuple slot used for retrieve data */
	TupleTableSlot *retrieveTs;
	/* TupleQueueReader to read tuple from message queue */
	TupleQueueReader *tqReader;
	/* Track retrieve state */
	enum RetrieveState retrieveState;
}			RetrieveExecEntry;

/*
 * Local structure to the current retrieve operation.
 */
typedef struct RetrieveControl
{
	/*
	 * Track current retrieve entry in executor. Multiple entries are allowed
	 * to be in one retrieve session but only one entry is active.
	 */
	RetrieveExecEntry *current_entry;

	/*
	 * Hash table to cache tuple descriptors for all endpoint_names which have
	 * been retrieved in this retrieve session.
	 */
	HTAB	   *RetrieveExecEntryHTB;

	/*
	 * The endpoint sessionID of the current retrieve entry. Assigned after
	 * authentication.
	 */
	int			sessionID;
}			RetrieveControl;

static RetrieveControl RetrieveCtl =
{
	NULL, NULL, InvalidEndpointSessionId
};

static void init_retrieve_exec_entry(RetrieveExecEntry * entry);
static Endpoint *get_endpoint_from_retrieve_exec_entry(RetrieveExecEntry *entry, bool noError);
static void start_retrieve(const char *endpointName);
static void validate_retrieve_endpoint(Endpoint *endpointDesc, const char *endpointName);
static void finish_retrieve(bool resetPID);
static void attach_receiver_mq(dsm_handle dsmHandle);
static void detach_receiver_mq(RetrieveExecEntry *entry);
static void notify_sender(bool finished);
static void retrieve_cancel_action(RetrieveExecEntry *entry, char *msg);
static void retrieve_exit_callback(int code, Datum arg);
static void retrieve_xact_callback(XactEvent ev, void *arg);
static void retrieve_subxact_callback(SubXactEvent event,
									  SubTransactionId mySubid,
									  SubTransactionId parentSubid,
									  void *arg);
static TupleTableSlot *retrieve_next_tuple(void);

/*
 * AuthEndpoint - Authenticate for retrieve connection.
 *
 * Return true if authentication passes.
 */
bool
AuthEndpoint(Oid userID, const char *tokenStr)
{
	bool		found = false;
	int8		token[ENDPOINT_TOKEN_ARR_LEN] = {0};

	endpoint_token_str2arr(tokenStr, token);

	RetrieveCtl.sessionID = get_session_id_from_token(userID, token);
	if (RetrieveCtl.sessionID != InvalidEndpointSessionId)
	{
		found = true;
		before_shmem_exit(retrieve_exit_callback, (Datum) 0);
		RegisterSubXactCallback(retrieve_subxact_callback, NULL);
		RegisterXactCallback(retrieve_xact_callback, NULL);
	}

	return found;
}

/*
 * GetRetrieveStmtTupleDesc - Gets TupleDesc for the given retrieve statement.
 *
 * This function calls start_retrieve() to initialize related data structure
 * and returns the tuple descriptor.
 */
TupleDesc
GetRetrieveStmtTupleDesc(const RetrieveStmt * stmt)
{
	start_retrieve(stmt->endpoint_name);

	return RetrieveCtl.current_entry->retrieveTs->tts_tupleDescriptor;
}

/*
 * ExecRetrieveStmt - Execute the given RetrieveStmt.
 *
 * This function tries to use the endpoint name in the RetrieveStmt to find the
 * attached endpoint in this retrieve session. If the endpoint can be found,
 * then read from the message queue to feed the active portal's tuplestore. And
 * mark the endpoint as detached before returning.
 */
void
ExecRetrieveStmt(const RetrieveStmt *stmt, DestReceiver *dest)
{
	TupleTableSlot *result = NULL;
	int64		retrieveCount = 0;

	if (RetrieveCtl.current_entry == NULL)
		ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR),
						errmsg("endpoint %s is not attached",
							   stmt->endpoint_name)));

	retrieveCount = stmt->count;
	if (retrieveCount <= 0 && !stmt->is_all)
		ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
						errmsg("RETRIEVE statement only supports forward scan, "
							   "count should not be: %ld",
							   retrieveCount)));

	Assert(dest->mydest == DestTuplestore);
	Assert(RetrieveCtl.current_entry->retrieveState > RETRIEVE_STATE_INIT);

	if (RetrieveCtl.current_entry->retrieveState < RETRIEVE_STATE_FINISHED)
	{
		while (stmt->is_all || retrieveCount > 0)
		{
			result = retrieve_next_tuple();
			if (!result)
				break;

			(*dest->receiveSlot) (result, dest);
			if (!stmt->is_all)
				retrieveCount--;
		}
	}
	else
	{
		/* All tuples have already been retrieved. Nothing to do */
	}

	finish_retrieve(false);
}

/*
 * init_retrieve_exec_entry - Initialize RetrieveExecEntry.
 */
static void
init_retrieve_exec_entry(RetrieveExecEntry * entry)
{
	entry->mqSeg = NULL;
	entry->endpoint = NULL;
	entry->mqHandle = NULL;
	entry->retrieveTs = NULL;
	entry->retrieveState = RETRIEVE_STATE_INIT;
}

/*
 * get_endpoint_from_retrieve_exec_entry
 *
 * Get endpoint from the entry if exists and validate the endpoint slot
 * still belong to current entry since it may get reused by other endpoints.
 *
 * if there is something wrong during validation, warn or error out, depending
 * on the parameter noError.
 */
static Endpoint*
get_endpoint_from_retrieve_exec_entry(RetrieveExecEntry * entry, bool noError)
{
	Assert(LWLockHeldByMe(ParallelCursorEndpointLock));

	/* Sanity check and error out if needed. */
	if (entry->endpoint != NULL)
	{
		if (entry->endpoint->empty)
		{
			ereport(noError ? WARNING : ERROR,
					(errcode(ERRCODE_INTERNAL_ERROR),
					 errmsg("endpoint is not available because the parallel "
							"retrieve cursor was aborted")));
			entry->endpoint = NULL;
		}
		else if (!(endpoint_name_equals(entry->endpoint->name, entry->endpointName) &&
				   RetrieveCtl.sessionID == entry->endpoint->sessionID))
		{
			ereport(noError ? WARNING : ERROR,
					(errcode(ERRCODE_INTERNAL_ERROR),
					 errmsg("endpoint slot in RetrieveExecEntry is reused by others"),
					 errdetail("endpoint name (%s vs %s), session ID (%d vs %d)",
							   entry->endpoint->name, entry->endpointName,
							   RetrieveCtl.sessionID, entry->endpoint->sessionID)));
			entry->endpoint = NULL;
		}
	}

	return entry->endpoint;
}

/*
 * Initialize a hashtable, its key is the endpoint's name, its value is
 * RetrieveExecEntry
*/
void InitRetrieveCtl(void)
{
		HASHCTL		ctl;

		if (RetrieveCtl.RetrieveExecEntryHTB)
			return;

		MemSet(&ctl, 0, sizeof(ctl));
		ctl.keysize = NAMEDATALEN;
		ctl.entrysize = sizeof(RetrieveExecEntry);
		ctl.hash = string_hash;
		RetrieveCtl.RetrieveExecEntryHTB = hash_create("retrieve hash", MAX_ENDPOINT_SIZE, &ctl,
													   (HASH_ELEM | HASH_FUNCTION));
		RetrieveCtl.current_entry = NULL;
}

/*
 * start_retrieve - start to retrieve an endpoint.
 *
 * Initialize current retrieve RetrieveExecEntry for the given
 * endpoint if it's the first time to retrieve the endpoint.
 * Attach to the endpoint's shm_mq.
 *
 * Set the endpoint status to ENDPOINTSTATE_RETRIEVING.
 *
 * When call RETRIEVE statement in PQprepare() & PQexecPrepared(), this func will
 * be called 2 times.
 */
static void
start_retrieve(const char *endpointName)
{
	HTAB	   *entryHTB = RetrieveCtl.RetrieveExecEntryHTB;
	RetrieveExecEntry *entry = NULL;
	bool		found = false;
	Endpoint	*endpoint;
	dsm_handle	handle = DSM_HANDLE_INVALID;


	entry = hash_search(entryHTB, endpointName, HASH_FIND, &found);

	LWLockAcquire(ParallelCursorEndpointLock, LW_EXCLUSIVE);

	if (found)
		endpoint = get_endpoint_from_retrieve_exec_entry(entry, false);
	else
	{
		endpoint = find_endpoint(endpointName, RetrieveCtl.sessionID);
		if (!endpoint)
			ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
							errmsg("the endpoint %s does not exist for session id %d",
								   endpointName, RetrieveCtl.sessionID)));
		validate_retrieve_endpoint(endpoint, endpointName);
		endpoint->receiverPid = MyProcPid;
		handle = endpoint->mqDsmHandle;

		/* insert it into hashtable */
		entry = hash_search(entryHTB, endpointName, HASH_ENTER, NULL);
		init_retrieve_exec_entry(entry);
	}

	/* begins to retrieve tuples from endpoint if still have data to retrieve. */
	if (endpoint->state == ENDPOINTSTATE_READY ||
		endpoint->state == ENDPOINTSTATE_ATTACHED)
	{
		endpoint->state = ENDPOINTSTATE_RETRIEVING;
	}

	LWLockRelease(ParallelCursorEndpointLock);

	entry->endpoint = endpoint;

	RetrieveCtl.current_entry = entry;

	if (!found)
		attach_receiver_mq(handle);

	if (CurrentSession->segment == NULL)
		AttachSession(endpoint->sessionDsmHandle);
}

/*
 * validate_retrieve_endpoint - after finding the retrieve endpoint,
 * validate whether it meets the requirement.
 */
static void
validate_retrieve_endpoint(Endpoint *endpoint, const char *endpointName)
{
	Assert(endpoint->mqDsmHandle != DSM_HANDLE_INVALID);

	if (endpoint->userID != GetSessionUserId())
	{
		ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
						errmsg("the PARALLEL RETRIEVE CURSOR was created by "
							   "a different user"),
						errhint("Use the same user as the PARALLEL "
								"RETRIEVE CURSOR creator to retrieve.")));
	}

	switch (endpoint->state)
	{
		case ENDPOINTSTATE_FINISHED:
			ereport(ERROR,
					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
					 errmsg("another session (pid: %d) used the endpoint and "
							"completed retrieving",
							endpoint->receiverPid)));
			break;				/* make compiler happy */
		case ENDPOINTSTATE_READY:
		case ENDPOINTSTATE_ATTACHED:
			break;
		default:
			ereport(ERROR,
					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
					 errmsg("endpoint %s (state: %s) was used by another retrieve session (pid: %d)",
							endpointName,
							state_enum_to_string(endpoint->state),
							endpoint->receiverPid),
					 errdetail("If pid is -1, that session has been detached.")));
	}

	if (endpoint->receiverPid != InvalidPid && endpoint->receiverPid != MyProcPid)
	{
		ereport(ERROR,
				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
				 errmsg("endpoint %s was already attached by receiver(pid: %d)",
						endpointName, endpoint->receiverPid),
				 errdetail("An endpoint can only be attached by one retrieving session.")));
	}
}

/*
 * Attach to the endpoint's shared memory message queue.
 */
static void
attach_receiver_mq(dsm_handle dsmHandle)
{
	TupleDesc	td;
	TupleDescNode *tupdescnode;
	MemoryContext oldcontext;
	shm_toc    *toc;
	void	   *lookup_space;
	int			td_len;
	RetrieveExecEntry *entry = RetrieveCtl.current_entry;

	Assert(!entry->mqSeg);
	Assert(!entry->mqHandle);
	Assert(entry->retrieveState == RETRIEVE_STATE_INIT);

	/*
	 * Store the result slot all the retrieve mode QE life cycle, we only have
	 * one chance to build it.
	 */
	oldcontext = MemoryContextSwitchTo(TopMemoryContext);

	elogif(gp_log_endpoints, LOG, "CDB_ENDPOINT: init message queue conn for receiver");

	entry->mqSeg = dsm_attach(dsmHandle);
	if (entry->mqSeg == NULL)
		ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
						errmsg("attach to endpoint shared message queue failed")));

	dsm_pin_mapping(entry->mqSeg);
	toc = shm_toc_attach(ENDPOINT_MSG_QUEUE_MAGIC, dsm_segment_address(entry->mqSeg));
	if (toc == NULL)
		ereport(ERROR,
				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
				 errmsg("invalid magic number in dynamic shared memory segment")));

	/*
	 * Find the shared mq for tuple receiving from 'toc' and set up the
	 * connection.
	 */
	shm_mq	   *mq = shm_toc_lookup(toc, ENDPOINT_KEY_TUPLE_QUEUE, false);

	shm_mq_set_receiver(mq, MyProc);
	entry->mqHandle = shm_mq_attach(mq, entry->mqSeg, NULL);

	/*
	 * Find the tuple descritpr information from 'toc' and set the tuple
	 * descriptor.
	 */
	lookup_space = shm_toc_lookup(toc, ENDPOINT_KEY_TUPLE_DESC_LEN, false);
	td_len = *(int *) lookup_space;
	lookup_space = shm_toc_lookup(toc, ENDPOINT_KEY_TUPLE_DESC, false);
	tupdescnode = (TupleDescNode *) deserializeNode(lookup_space, td_len);
	td = tupdescnode->tuple;
	if (entry->retrieveTs != NULL)
		ExecClearTuple(entry->retrieveTs);
	else
		entry->retrieveTs = MakeTupleTableSlot(td, &TTSOpsHeapTuple);

	/* Create the tuple queue reader. */
	entry->tqReader = CreateTupleQueueReader(entry->mqHandle);
	entry->retrieveState = RETRIEVE_STATE_ATTACHED;

	MemoryContextSwitchTo(oldcontext);
}

/*
 * Detach from the endpoint's message queue.
 */
static void
detach_receiver_mq(RetrieveExecEntry * entry)
{
	shm_mq_detach(entry->mqHandle);
	entry->mqHandle = NULL;

	dsm_detach(entry->mqSeg);
	entry->mqSeg = NULL;
}

/*
 * Notify the sender to stop waiting on the ackDone latch.
 *
 * If current endpoint get freed, it means the endpoint aborted.
 */
static void
notify_sender(bool finished)
{
	Endpoint	*endpoint;

	LWLockAcquire(ParallelCursorEndpointLock, LW_SHARED);
	endpoint = get_endpoint_from_retrieve_exec_entry(RetrieveCtl.current_entry, false);
	if (finished)
		endpoint->state = ENDPOINTSTATE_FINISHED;
	SetLatch(&endpoint->ackDone);
	LWLockRelease(ParallelCursorEndpointLock);
}

/*
 * Read a tuple from shared memory message queue.
 *
 * When reading all tuples, should tell sender that retrieve is done.
 */
static TupleTableSlot *
retrieve_next_tuple()
{
	TupleTableSlot *result = NULL;
	HeapTuple	tup = NULL;
	bool		readerdone = false;
	RetrieveExecEntry *entry = RetrieveCtl.current_entry;

	CHECK_FOR_INTERRUPTS();

	/* at the first time to retrieve data */
	if (entry->retrieveState == RETRIEVE_STATE_ATTACHED)
	{
		/*
		 * try to receive data with nowait, so that empty result will not hang
		 * here
		 */
		tup = TupleQueueReaderNext(entry->tqReader, true, &readerdone);

		entry->retrieveState = RETRIEVE_STATE_RECEIVING;

		/*
		 * at the first time to retrieve data, tell sender not to wait at
		 * wait_receiver()
		 */
		elogif(gp_log_endpoints, LOG, "CDB_ENDPOINT: receiver notifies sender in "
			   "retrieve_next_tuple() when retrieving data for the first time");
		notify_sender(false);
	}

	SIMPLE_FAULT_INJECTOR("fetch_tuples_from_endpoint");

	/*
	 * re-retrieve data in wait mode if not the first time retrieve data or if
	 * the first time retrieve an invalid data, but not finish
	 */
	if (readerdone == false && tup == NULL)
		tup = TupleQueueReaderNext(entry->tqReader, false, &readerdone);

	/* readerdone returns true only after sender detached message queue */
	if (readerdone)
	{
		Assert(!tup);
		DestroyTupleQueueReader(entry->tqReader);
		entry->tqReader = NULL;

		/*
		 * dsm_detach will send SIGUSR1 to sender which may interrupt the
		 * procLatch. But sender will wait on procLatch after finishing
		 * sending. So dsm_detach has to be called earlier to ensure the
		 * SIGUSR1 is coming from the CLOSE CURSOR.
		 */
		detach_receiver_mq(entry);

		/*
		 * We do not call DetachSession() here since we still need that for
		 * the transient record tuples.
		 */
		entry->retrieveState = RETRIEVE_STATE_FINISHED;
		notify_sender(true);
		return NULL;
	}

	if (HeapTupleIsValid(tup))
	{
		ExecClearTuple(entry->retrieveTs);
		result = entry->retrieveTs;
		ExecStoreHeapTuple(tup, /* tuple to store */
						   result,	/* slot in which to store the tuple */
						   false);	/* slot should not pfree tuple */
	}
	return result;
}

/*
 * finish_retrieve - Finish a retrieve statement.
 *
 * If current retrieve statement retrieve all tuples from endpoint. Set
 * endpoint state to ENDPOINTSTATE_FINISHED.  Otherwise, set endpoint's status
 * from ENDPOINTSTATE_RETRIEVING to ENDPOINTSTATE_ATTACHED.
 *
 * Note: don't drop the result slot, we only have one chance to built it.
 * Errors in these function is not expected to be raised.
 */
static void
finish_retrieve(bool resetPID)
{
	Endpoint	*endpoint = NULL;
	RetrieveExecEntry *entry = RetrieveCtl.current_entry;

	Assert(entry);

	LWLockAcquire(ParallelCursorEndpointLock, LW_EXCLUSIVE);
	endpoint = get_endpoint_from_retrieve_exec_entry(entry, true);
	if (endpoint == NULL)
	{
		/*
		 * The endpoint has already cleaned up the Endpoint entry, or during
		 * the retrieve abort stage, sender cleaned the Endpoint entry. And
		 * another endpoint gets allocated just after the cleanup, which will
		 * occupy current endpoint entry.
		 * remove the entry from RetrieveCtl.RetrieveExecEntryHTB also.
		 * to avoid next statement in start_retrieve can get entry from RetrieveCtl.RetrieveExecEntryHTB,
		 * however, can not get endpoint through get_endpoint_from_retrieve_exec_entry.
		 */
		LWLockRelease(ParallelCursorEndpointLock);
		elogif(gp_log_endpoints, LOG, "the Endpoint entry %s has already been cleaned, \
			   remove from RetrieveCtl.RetrieveExecEntryHTB hash table", entry->endpointName);
		hash_search(RetrieveCtl.RetrieveExecEntryHTB, entry->endpointName, HASH_REMOVE, NULL);
		RetrieveCtl.current_entry = NULL;
		return;
	}

	/*
	 * If the receiver pid get retrieve_cancel_action, the receiver pid is
	 * invalid.
	 */
	if (endpoint->receiverPid != MyProcPid && endpoint->receiverPid != InvalidPid)
		ereport(ERROR,
				(errcode(ERRCODE_INTERNAL_ERROR),
				 errmsg("unmatched pid, expected %d but it's %d", MyProcPid,
						endpoint->receiverPid)));

	if (resetPID)
		endpoint->receiverPid = InvalidPid;

	/* Don't set if ENDPOINTSTATE_FINISHED */
	if (endpoint->state == ENDPOINTSTATE_RETRIEVING)
	{
		/*
		 * If finish retrieving, set the endpoint to FINISHED, otherwise set
		 * the endpoint to ATTACHED.
		 */
		if (entry->retrieveState == RETRIEVE_STATE_FINISHED)
			endpoint->state = ENDPOINTSTATE_FINISHED;
		else
			endpoint->state = ENDPOINTSTATE_ATTACHED;
	}

	LWLockRelease(ParallelCursorEndpointLock);
	RetrieveCtl.current_entry = NULL;
}

/*
 * When retrieve role exit with error, let endpoint/sender know exception
 * happened.
 */
static void
retrieve_cancel_action(RetrieveExecEntry * entry, char *msg)
{
	Endpoint	*endpoint;

	Assert(entry);

	LWLockAcquire(ParallelCursorEndpointLock, LW_EXCLUSIVE);

	endpoint = get_endpoint_from_retrieve_exec_entry(entry, true);

	if (endpoint != NULL &&
		endpoint->receiverPid == MyProcPid &&
		endpoint->state != ENDPOINTSTATE_FINISHED)
	{
		endpoint->receiverPid = InvalidPid;
		endpoint->state = ENDPOINTSTATE_RELEASED;
		if (endpoint->senderPid != InvalidPid)
		{
			elogif(gp_log_endpoints, LOG, "CDB_ENDPOINT: signal sender to abort");
			SetBackendCancelMessage(endpoint->senderPid, msg);
			kill(endpoint->senderPid, SIGINT);
		}
	}

	LWLockRelease(ParallelCursorEndpointLock);
}

/*
 * Callback when retrieve role on proc exit, before shmem exit.
 *
 * For Process Exists:
 * If a retrieve session has been retrieved from more than one endpoint, all of
 * the endpoints and their message queues in this session have to be detached when
 * process exits. In this case, the active RetrieveExecEntry will be detached
 * first in retrieve_exit_callback. Thus, no need to detach it again in
 * retrieve_xact_callback.
 *
 * shmem_exit()
 * --> ... (other before shmem callback if exists)
 * --> retrieve_exit_callback
 *	   --> cancel sender if needed.
 *	   --> detach all message queue dsm
 * --> ShutdownPostgres (the last before shmem callback)
 *	   --> AbortOutOfAnyTransaction
 *		   --> AbortTransaction
 *			   --> CallXactCallbacks
 *				   --> retrieve_xact_callback
 *		   --> CleanupTransaction
 * --> dsm_backend_shutdown
 *
 * For Normal Transaction Aborts:
 * Retriever clean up job will be done in xact abort callback
 * retrieve_xact_callback which will only clean the active
 * RetrieveExecEntry.
 *
 * Question:
 * Is it better to detach the dsm we created/attached before dsm_backend_shutdown?
 * Or we can let dsm_backend_shutdown do the detach for us, so we don't need
 * register call back in before_shmem_exit.
 */
static void
retrieve_exit_callback(int code, Datum arg)
{
	HASH_SEQ_STATUS status;
	RetrieveExecEntry *entry;
	HTAB	   *entryHTB = RetrieveCtl.RetrieveExecEntryHTB;

	elogif(gp_log_endpoints, LOG, "CDB_ENDPOINT: retrieve exit callback");

	/* Nothing to do if the hashtable is not ready. */
	if (entryHTB == NULL)
		return;

	/* If the current retrieve statement has not fnished in this run. */
	if (RetrieveCtl.current_entry)
		finish_retrieve(true);

	/* Cancel all partially retrieved endpoints in this session. */
	hash_seq_init(&status, entryHTB);
	while ((entry = (RetrieveExecEntry *) hash_seq_search(&status)) != NULL)
	{
		if (entry->retrieveState != RETRIEVE_STATE_FINISHED)
			retrieve_cancel_action(entry, "Endpoint retrieve session is "
								   "quitting. All unfinished parallel retrieve "
								   "cursors on the session will be terminated.");
		if (entry->mqSeg)
			detach_receiver_mq(entry);
	}
	entryHTB = NULL;

	if (CurrentSession->segment != NULL)
		DetachSession();
}

/*
 * Retrieve role xact abort callback.
 *
 * If normal abort, finish_retrieve() and retrieve_cancel_action() will be
 * called once in current function for current endpoint_name. but if it's proc
 * exit, these two methods will be called twice for current endpoint_name,
 * since we call these two methods before dsm detach.
 */
static void
retrieve_xact_callback(XactEvent ev, void *arg pg_attribute_unused())
{
	if (ev == XACT_EVENT_ABORT)
	{
		elogif(gp_log_endpoints, LOG, "CDB_ENDPOINT: retrieve xact abort callback");
		if (RetrieveCtl.sessionID != InvalidEndpointSessionId &&
			RetrieveCtl.current_entry)
		{
			if (RetrieveCtl.current_entry->retrieveState != RETRIEVE_STATE_FINISHED)
				retrieve_cancel_action(RetrieveCtl.current_entry,
									   "Endpoint retrieve statement aborted");
			finish_retrieve(true);

		}
	}

	if (CurrentSession != NULL && CurrentSession->segment != NULL)
		DetachSession();
}

/*
 * Callback for retrieve role's sub-xact abort .
 */
static void
retrieve_subxact_callback(SubXactEvent event,
						  SubTransactionId mySubid pg_attribute_unused(),
						  SubTransactionId parentSubid pg_attribute_unused(),
						  void *arg pg_attribute_unused())
{
	if (event == SUBXACT_EVENT_ABORT_SUB)
		retrieve_xact_callback(XACT_EVENT_ABORT, NULL);
}

相关信息

greenplumn 源码目录

相关文章

greenplumn cdbendpoint 源码

greenplumn cdbendpoint_private 源码

greenplumn cdbendpointutils 源码

0  赞