greenplumn cdbdisp 源码

  • 2022-08-18
  • 浏览 (451)

greenplumn cdbdisp 代码

文件路径:/src/backend/cdb/dispatcher/cdbdisp.c

/*-------------------------------------------------------------------------
 *
 * cdbdisp.c
 *	  Functions to dispatch commands to QExecutors.
 *
 *
 * Portions Copyright (c) 2005-2008, Greenplum inc
 * Portions Copyright (c) 2012-Present VMware, Inc. or its affiliates.
 *
 *
 * IDENTIFICATION
 *	    src/backend/cdb/dispatcher/cdbdisp.c
 *
 *-------------------------------------------------------------------------
 */

#include "postgres.h"

#include "storage/ipc.h"		/* For proc_exit_inprogress */
#include "tcop/tcopprot.h"
#include "cdb/cdbdisp.h"
#include "cdb/cdbdisp_async.h"
#include "cdb/cdbdispatchresult.h"
#include "executor/execUtils.h"
#include "libpq-fe.h"
#include "libpq-int.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "cdb/cdbfts.h"
#include "cdb/cdbgang.h"
#include "cdb/cdbsreh.h"
#include "cdb/cdbvars.h"
#include "utils/resowner.h"

static int numNonExtendedDispatcherState = 0;

dispatcher_handle_t *open_dispatcher_handles;
static void cleanup_dispatcher_handle(dispatcher_handle_t *h);

static dispatcher_handle_t *find_dispatcher_handle(CdbDispatcherState *ds);
static dispatcher_handle_t *allocate_dispatcher_handle(void);
static void destroy_dispatcher_handle(dispatcher_handle_t *h);
static char * segmentsListToString(const char *prefix, List *segments);

static DispatcherInternalFuncs *pDispatchFuncs = &DispatcherAsyncFuncs;

/*
 * cdbdisp_dispatchToGang:
 * Send the strCommand SQL statement to the subset of all segdbs in the cluster
 * specified by the gang parameter. cancelOnError indicates whether an error
 * occurring on one of the qExec segdbs should cause all still-executing commands to cancel
 * on other qExecs. Normally this would be true. The commands are sent over the libpq
 * connections that were established during cdblink_setup.
 *
 * The caller must provide a CdbDispatchResults object having available
 * resultArray slots sufficient for the number of QEs to be dispatched:
 * i.e., resultCapacity - resultCount >= gp->size.	This function will
 * assign one resultArray slot per QE of the Gang, paralleling the Gang's
 * db_descriptors array. Success or failure of each QE will be noted in
 * the QE's CdbDispatchResult entry; but before examining the results, the
 * caller must wait for execution to end by calling cdbdisp_checkDispatchResult().
 *
 * The CdbDispatchResults object owns some malloc'ed storage, so the caller
 * must make certain to free it by calling cdbdisp_destroyDispatcherState().
 *
 * When dispatchResults->cancelOnError is false, strCommand is to be
 * dispatched to every connected gang member if possible, despite any
 * cancellation requests, QE errors, connection failures, etc.
 *
 * NB: This function should return normally even if there is an error.
 * It should not longjmp out via elog(ERROR, ...), ereport(ERROR, ...),
 * PG_THROW, CHECK_FOR_INTERRUPTS, etc.
 */
void
cdbdisp_dispatchToGang(struct CdbDispatcherState *ds,
					   struct Gang *gp,
					   int sliceIndex)
{
	Assert(Gp_role == GP_ROLE_DISPATCH);
	Assert(gp && gp->size > 0);
	Assert(ds->primaryResults && ds->primaryResults->resultArray);

	(pDispatchFuncs->dispatchToGang) (ds, gp, sliceIndex);
}

/*
 * For asynchronous dispatcher, we have to wait all dispatch to finish before we move on to query execution,
 * otherwise we may get into a deadlock situation, e.g, gather motion node waiting for data,
 * while segments waiting for plan.
 */
void
cdbdisp_waitDispatchFinish(struct CdbDispatcherState *ds)
{
	if (pDispatchFuncs->waitDispatchFinish != NULL)
		(pDispatchFuncs->waitDispatchFinish) (ds);
}

/*
 * cdbdisp_checkDispatchAckMessage:
 *
 * On QD, check if any expected acknowledge messages from QEs have arrived.
 * In some cases, QD needs to check or wait the expected acknowledge messages
 * from QEs, e.g. when defining a parallel retrieve cursor, so that QD can
 * know if QEs run as expected.
 *
 * message: specifies the expected ACK message to check.
 * timeout_sec: the second that the dispatcher waits for the ack messages at most.
 *       0 means checking immediately, and -1 means waiting until all ack
 *       messages are received.
 */
bool
cdbdisp_checkDispatchAckMessage(struct CdbDispatcherState *ds,
							   const char *message, int timeout_sec)
{
	if (pDispatchFuncs == NULL || pDispatchFuncs->checkAckMessage == NULL)
		return false;

	return (pDispatchFuncs->checkAckMessage) (ds, message, timeout_sec);
}

/*
 * cdbdisp_checkDispatchResult:
 *
 * Waits for completion of threads launched by cdbdisp_dispatchToGang().
 *
 * QEs that were dispatched with 'cancelOnError' true and are not yet idle
 * will be canceled/finished according to waitMode.
 */
void
cdbdisp_checkDispatchResult(struct CdbDispatcherState *ds,
					   DispatchWaitMode waitMode)
{
	(pDispatchFuncs->checkResults) (ds, waitMode);

	if (log_dispatch_stats)
		ShowUsage("DISPATCH STATISTICS");

	if (DEBUG1 >= log_min_messages)
	{
		char		msec_str[32];

		switch (check_log_duration(msec_str, false))
		{
			case 1:
			case 2:
				ereport(LOG,
						(errmsg("duration to dispatch result received from all QEs: %s ms", msec_str)));
				break;
		}
	}
}

/*
 * cdbdisp_getDispatchResults:
 *
 * Block until all QEs return results or report errors.
 *
 * Return Values:
 *   Return NULL If one or more QEs got Error. In that case, *qeErrors contains
 *   a list of ErrorDatas.
 */
struct CdbDispatchResults *
cdbdisp_getDispatchResults(struct CdbDispatcherState *ds, ErrorData **qeError)
{
	int			errorcode;

	if (!ds || !ds->primaryResults)
	{
		/*
		 * Fallback in case we have no dispatcher state.  Since the caller is
		 * likely to output the errors on NULL return, add an error message to
		 * aid debugging.
		 */
		if (errstart(ERROR, TEXTDOMAIN))
		{
			errcode(ERRCODE_INTERNAL_ERROR);
			errmsg("no dispatcher state");
			*qeError = errfinish_and_return(__FILE__, __LINE__, PG_FUNCNAME_MACRO);
		}
		else
			pg_unreachable();

		return NULL;
	}

	/* check if any error reported */
	errorcode = ds->primaryResults->errcode;

	if (errorcode)
	{
		cdbdisp_dumpDispatchResults(ds->primaryResults, qeError);
		return NULL;
	}

	return ds->primaryResults;
}

/*
 * CdbDispatchHandleError
 *
 * When caller catches an error, the PG_CATCH handler can use this
 * function instead of cdbdisp_finishCommand to wait for all QEs
 * to finish, and report QE errors if appropriate.
 * This function should be called only from PG_CATCH handlers.
 *
 * This function doesn't cleanup dispatcher state, dispatcher state
 * will be destroyed as part of the resource owner cleanup.
 *
 * On return, the caller is expected to finish its own cleanup and
 * exit via PG_RE_THROW().
 */
void
CdbDispatchHandleError(struct CdbDispatcherState *ds)
{
	int		qderrcode;
	bool		useQeError = false;
	ErrorData *error = NULL;

	/*
	 * If cdbdisp_dispatchToGang() wasn't called, don't wait.
	 */
	if (!ds || !ds->primaryResults)
		return;

	/*
	 * Request any remaining commands executing on qExecs to stop. We need to
	 * wait for the threads to finish. This allows for proper cleanup of the
	 * results from the async command executions. Cancel any QEs still
	 * running.
	 */
	cdbdisp_cancelDispatch(ds);

	/*
	 * When a QE stops executing a command due to an error, as a consequence
	 * there can be a cascade of interconnect errors (usually "sender closed
	 * connection prematurely") thrown in downstream processes (QEs and QD).
	 * So if we are handling an interconnect error, and a QE hit a more
	 * interesting error, we'll let the QE's error report take precedence.
	 */
	qderrcode = elog_geterrcode();
	if (qderrcode == ERRCODE_GP_INTERCONNECTION_ERROR)
	{
		bool		qd_lost_flag = false;
		char	   *qderrtext = elog_message();

		if (qderrtext
			&& strcmp(qderrtext, CDB_MOTION_LOST_CONTACT_STRING) == 0)
			qd_lost_flag = true;

		if (ds->primaryResults && ds->primaryResults->errcode)
		{
			if (qd_lost_flag
				&& ds->primaryResults->errcode == ERRCODE_GP_INTERCONNECTION_ERROR)
				useQeError = true;
			else if (ds->primaryResults->errcode != ERRCODE_GP_INTERCONNECTION_ERROR)
				useQeError = true;
		}
	}

	if (useQeError)
	{
		/*
		 * Throw the QE's error, catch it, and fall thru to return normally so
		 * caller can finish cleaning up. Afterwards caller must exit via
		 * PG_RE_THROW().
		 */
		MemoryContext oldcontext;

		/*
		 * During abort processing, we are running in ErrorContext. Avoid
		 * doing these heavy things in ErrorContext. (There's one particular
		 * issue: these calls use CopyErrorData(), which asserts that we
		 * are not in ErrorContext.)
		 */
		oldcontext = MemoryContextSwitchTo(CurTransactionContext);

		PG_TRY();
		{
			cdbdisp_getDispatchResults(ds, &error);
			if (error != NULL)
			{
				FlushErrorState();
				ReThrowError(error);
			}
		}
		PG_CATCH();
		{
		}						/* nop; fall thru */
		PG_END_TRY();

		MemoryContextSwitchTo(oldcontext);
	}

	cdbdisp_destroyDispatcherState(ds);
}



/*
 * Allocate memory and initialize CdbDispatcherState.
 *
 * Call cdbdisp_destroyDispatcherState to free it.
 */
CdbDispatcherState *
cdbdisp_makeDispatcherState(bool isExtendedQuery)
{
	dispatcher_handle_t *handle;

	if (!isExtendedQuery)
	{
		if (numNonExtendedDispatcherState == 1)
		{
			ereport(ERROR,
					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
					 errmsg("query plan with multiple segworker groups is not supported"),
					 errhint("likely caused by a function that reads or modifies data in a distributed table")));


		}

		numNonExtendedDispatcherState++;	
	}

	handle = allocate_dispatcher_handle();
	handle->dispatcherState->forceDestroyGang = false;
	handle->dispatcherState->isExtendedQuery = isExtendedQuery;
#ifdef USE_ASSERT_CHECKING
	handle->dispatcherState->isGangDestroying = false;
#endif
	handle->dispatcherState->allocatedGangs = NIL;
	handle->dispatcherState->largestGangSize = 0;
	handle->dispatcherState->rootGangSize = 0;
	handle->dispatcherState->destroyIdleReaderGang = false;

	return handle->dispatcherState;
}

void
cdbdisp_makeDispatchParams(CdbDispatcherState *ds,
							int maxSlices,
							char *queryText,
							int queryTextLen)
{
	MemoryContext oldContext;
	void *dispatchParams;

	Assert(DispatcherContext);
	oldContext = MemoryContextSwitchTo(DispatcherContext);

	dispatchParams = (pDispatchFuncs->makeDispatchParams) (maxSlices, ds->largestGangSize, queryText, queryTextLen);

	ds->dispatchParams = dispatchParams;

	MemoryContextSwitchTo(oldContext);
}

/*
 * Free memory in CdbDispatcherState
 *
 * Free dispatcher memory context.
 */
void
cdbdisp_destroyDispatcherState(CdbDispatcherState *ds)
{
	ListCell *lc;
	CdbDispatchResults *results;
	dispatcher_handle_t *h;

	if (!ds)
		return;
#ifdef USE_ASSERT_CHECKING
	/* Disallow reentrance. */
	Assert (!ds->isGangDestroying);
	ds->isGangDestroying = true;
#endif

	if (!ds->isExtendedQuery)
	{
		numNonExtendedDispatcherState--;	
		Assert(numNonExtendedDispatcherState == 0);
	}

	results = ds->primaryResults;
	h = find_dispatcher_handle(ds);

	if (results != NULL && results->resultArray != NULL)
	{
		int			i;

		for (i = 0; i < results->resultCount; i++)
		{
			cdbdisp_termResult(&results->resultArray[i]);
		}
		results->resultArray = NULL;
	}

	/*
	 * Recycle or destroy gang accordingly.
	 *
	 * We must recycle them in the reverse order of AllocateGang() to restore
	 * the original order of the idle gangs.
	 */
	foreach(lc, ds->allocatedGangs)
	{
		Gang *gp = lfirst(lc);

		RecycleGang(gp, ds->forceDestroyGang);
	}

	/*
	 * Destroy all the idle reader gangs when flag destroyIdleReaderGang is true
	 */
	if (ds->destroyIdleReaderGang)
		cdbcomponent_cleanupIdleQEs(false);

	ds->allocatedGangs = NIL;
	ds->dispatchParams = NULL;
	ds->primaryResults = NULL;
	ds->largestGangSize = 0;
	ds->rootGangSize = 0;

	if (h != NULL)
		destroy_dispatcher_handle(h);
}

void
cdbdisp_cancelDispatch(CdbDispatcherState *ds)
{
	cdbdisp_checkDispatchResult(ds, DISPATCH_WAIT_CANCEL);
}

bool
cdbdisp_checkForCancel(CdbDispatcherState *ds)
{
	if (pDispatchFuncs == NULL || pDispatchFuncs->checkForCancel == NULL)
		return false;
	return (pDispatchFuncs->checkForCancel) (ds);
}

/*
 * Return all file descriptors to wait for events from the QEs after
 * dispatching a query.
 *
 * nsocks is the returned socket fds number (as an output param):
 *
 * Return value is the array of socket fds.
 * It will be palloced in pDispatchFuncs->getWaitSocketFd(), and contains
 * all waiting socket fds. (Note: caller need to pfree it)
 *
 * This is intended for use with cdbdisp_checkForCancel(). First call
 * cdbdisp_getWaitSocketFds(), and wait on that sockets to become readable
 * e.g. with select() or poll(). When it becomes readable, call
 * cdbdisp_checkForCancel() to process the incoming data, and repeat.
 */
int *
cdbdisp_getWaitSocketFds(CdbDispatcherState *ds, int *nsocks)
{
	if (pDispatchFuncs == NULL || pDispatchFuncs->getWaitSocketFds == NULL)
		return NULL;
	return (pDispatchFuncs->getWaitSocketFds) (ds, nsocks);
}

dispatcher_handle_t *
allocate_dispatcher_handle(void)
{
	dispatcher_handle_t	*h;

	if (DispatcherContext == NULL)
		DispatcherContext = AllocSetContextCreate(TopMemoryContext,
												 "Dispatch Context",
												 ALLOCSET_DEFAULT_MINSIZE,
												 ALLOCSET_DEFAULT_INITSIZE,
												 ALLOCSET_DEFAULT_MAXSIZE);


	h = MemoryContextAllocZero(DispatcherContext, sizeof(dispatcher_handle_t));

	h->dispatcherState = MemoryContextAllocZero(DispatcherContext, sizeof(CdbDispatcherState));
	h->owner = CurrentResourceOwner;
	h->next = open_dispatcher_handles;
	h->prev = NULL;
	if (open_dispatcher_handles)
		open_dispatcher_handles->prev = h;
	open_dispatcher_handles = h;

	return h;
}

static void
destroy_dispatcher_handle(dispatcher_handle_t *h)
{
	h->dispatcherState = NULL;

	/* unlink from linked list first */
	if (h->prev)
		h->prev->next = h->next;
	else
		open_dispatcher_handles = h->next;
	if (h->next)
		h->next->prev = h->prev;

	pfree(h);

	if (open_dispatcher_handles == NULL)
		MemoryContextReset(DispatcherContext);
}

static dispatcher_handle_t *
find_dispatcher_handle(CdbDispatcherState *ds)
{
	dispatcher_handle_t *head = open_dispatcher_handles;
	while (head != NULL)
	{
		if (head->dispatcherState == ds)
			return head;
		head = head->next;
	}
	return NULL;
}

static void
cleanup_dispatcher_handle(dispatcher_handle_t *h)
{
	if (h->dispatcherState == NULL)
	{
		destroy_dispatcher_handle(h);
		return;
	}

	cdbdisp_cancelDispatch(h->dispatcherState);
	cdbdisp_destroyDispatcherState(h->dispatcherState);
}

/*
 * Cleanup all dispatcher state that belong to
 * current resource owner and its childrens
 */
void
AtAbort_DispatcherState(void)
{
	if (Gp_role != GP_ROLE_DISPATCH)
		return;

	if (CurrentGangCreating != NULL)
	{
		RecycleGang(CurrentGangCreating, true);
		CurrentGangCreating = NULL;
	}

	/*
	 * Cleanup all outbound dispatcher states belong to
	 * current resource owner and its children
	 */
	CdbResourceOwnerWalker(CurrentResourceOwner, cdbdisp_cleanupDispatcherHandle);

	Assert(open_dispatcher_handles == NULL);

	/*
	 * If primary writer gang is destroyed in current Gxact
	 * reset session and drop temp files
	 */
	if (currentGxactWriterGangLost())
		ResetAllGangs();
}

void
AtSubAbort_DispatcherState(void)
{
	if (Gp_role != GP_ROLE_DISPATCH)
		return;

	if (CurrentGangCreating != NULL)
	{
		RecycleGang(CurrentGangCreating, true);
		CurrentGangCreating = NULL;
	}

	CdbResourceOwnerWalker(CurrentResourceOwner, cdbdisp_cleanupDispatcherHandle);
}

void
cdbdisp_cleanupDispatcherHandle(const struct ResourceOwnerData *owner)
{
	dispatcher_handle_t *curr;
	dispatcher_handle_t *next;

	next = open_dispatcher_handles;
	while (next)
	{
		curr = next;
		next = curr->next;

		if (curr->owner == owner)
		{
			cleanup_dispatcher_handle(curr);
		}
	}
}

/*
 * segmentsListToString
 *		Utility routine to convert a segment list into a string.
 */
static char *
segmentsListToString(const char *prefix, List *segments)
{
	StringInfoData string;
	ListCell   *l;

	initStringInfo(&string);
	appendStringInfo(&string, "%s: ", prefix);

	foreach(l, segments)
	{
		int segID = lfirst_int(l);

		appendStringInfo(&string, "%d ", segID);
	}

	return string.data;
}

char*
segmentsToContentStr(List *segments)
{
	int size = list_length(segments);

	if (size == 0)
		return "ALL contents";
	else if (size == 1)
		return "SINGLE content";
	else if (size < getgpsegmentCount())
		return segmentsListToString("PARTIAL contents", segments);
	else
		return segmentsListToString("ALL contents", segments);
}

相关信息

greenplumn 源码目录

相关文章

greenplumn cdbconn 源码

greenplumn cdbdisp_async 源码

greenplumn cdbdisp_dtx 源码

greenplumn cdbdisp_query 源码

greenplumn cdbdispatchresult 源码

greenplumn cdbgang 源码

greenplumn cdbgang_async 源码

greenplumn cdbpq 源码

0  赞