greenplumn cdbdisp_async 源码

  • 2022-08-18
  • 浏览 (452)

greenplumn cdbdisp_async 代码

文件路径:/src/backend/cdb/dispatcher/cdbdisp_async.c

/*-------------------------------------------------------------------------
 *
 * cdbdisp_async.c
 *	  Functions for asynchronous implementation of dispatching
 *	  commands to QExecutors.
 *
 * Portions Copyright (c) 2005-2008, Greenplum inc
 * Portions Copyright (c) 2012-Present VMware, Inc. or its affiliates.
 *
 *
 * IDENTIFICATION
 *	    src/backend/cdb/dispatcher/cdbdisp_async.c
 *
 *-------------------------------------------------------------------------
 */

#include "postgres.h"

#ifdef HAVE_POLL_H
#include <poll.h>
#endif
#ifdef HAVE_SYS_POLL_H
#include <sys/poll.h>
#endif

#include "pgstat.h"
#include "storage/ipc.h"		/* For proc_exit_inprogress  */
#include "tcop/tcopprot.h"
#include "cdb/cdbdisp.h"
#include "cdb/cdbdisp_async.h"
#include "cdb/cdbdispatchresult.h"
#include "libpq-fe.h"
#include "libpq-int.h"
#include "cdb/cdbfts.h"
#include "cdb/cdbgang.h"
#include "cdb/cdbvars.h"
#include "cdb/cdbpq.h"
#include "miscadmin.h"
#include "commands/sequence.h"
#include "access/xact.h"
#include "utils/timestamp.h"
#define DISPATCH_WAIT_TIMEOUT_MSEC 2000

/*
 * Ideally, we should set timeout to zero to cancel QEs as soon as possible,
 * but considering the cost of sending cancel signal is high, we want to process
 * as many finishing QEs as possible before cancelling
 */
#define DISPATCH_WAIT_CANCEL_TIMEOUT_MSEC 100

/*
 * DISPATCH_NO_WAIT means return immediate when there's no more data,
 * DISPATCH_WAIT_UNTIL_FINISH means wait until all dispatch works are completed.
 */
#define DISPATCH_NO_WAIT 0
#define DISPATCH_WAIT_UNTIL_FINISH -1

typedef struct CdbDispatchCmdAsync
{

	/*
	 * dispatchResultPtrArray: Array[0..dispatchCount-1] of CdbDispatchResult*
	 * Each CdbDispatchResult object points to a SegmentDatabaseDescriptor
	 * that dispatcher will send the command to.
	 */
	struct CdbDispatchResult **dispatchResultPtrArray;

	/* Number of segment DBs dispatched */
	int			dispatchCount;

	/*
	 * Depending on this mode, we may send query cancel or query finish
	 * message to QE while we are waiting it to complete.  NONE means we
	 * expect QE to complete without any instruction.
	 */
	volatile DispatchWaitMode waitMode;

	/*
	 * When waitMode is set to DISPATCH_WAIT_ACK_ROOT,
	 * the expected acknowledge message from QE should be specified.
	 * This field stores the expected acknowledge message.
	 */
	const char	*ackMessage;

	/*
	 * Text information to dispatch: The format is type(1 byte) + length(size
	 * of int) + content(n bytes)
	 *
	 * For DTX command, type is 'T', it's built by function
	 * buildGpDtxProtocolCommand. For query, type is 'M', it's built by
	 * function buildGpQueryString.
	 */
	char	   *query_text;
	int			query_text_len;

} CdbDispatchCmdAsync;

static void *cdbdisp_makeDispatchParams_async(int maxSlices, int largestGangSize, char *queryText, int len);

static bool cdbdisp_checkAckMessage_async(struct CdbDispatcherState *ds, const char *message,
									int timeout_sec);

static void cdbdisp_checkDispatchResult_async(struct CdbDispatcherState *ds,
								  DispatchWaitMode waitMode);

static void cdbdisp_dispatchToGang_async(struct CdbDispatcherState *ds,
							 struct Gang *gp,
							 int sliceIndex);
static void	cdbdisp_waitDispatchFinish_async(struct CdbDispatcherState *ds);

static bool	cdbdisp_checkForCancel_async(struct CdbDispatcherState *ds);
static int *cdbdisp_getWaitSocketFds_async(struct CdbDispatcherState *ds, int *nsocks);

DispatcherInternalFuncs DispatcherAsyncFuncs =
{
	cdbdisp_checkForCancel_async,
	cdbdisp_getWaitSocketFds_async,
	cdbdisp_makeDispatchParams_async,
	cdbdisp_checkAckMessage_async,
	cdbdisp_checkDispatchResult_async,
	cdbdisp_dispatchToGang_async,
	cdbdisp_waitDispatchFinish_async
};


static void dispatchCommand(CdbDispatchResult *dispatchResult,
				const char *query_text,
				int query_text_len);

static void checkDispatchResult(CdbDispatcherState *ds, int timeout_sec);

static void checkDispatchResultLoop(CdbDispatcherState *ds, int timeout_sec, WaitEventSet *waitset);

static void cdbdisp_waitDispatchFinishLoop_async(struct CdbDispatcherState *ds, WaitEventSet *waitset);

static bool processResults(CdbDispatchResult *dispatchResult);

static void
			signalQEs(CdbDispatchCmdAsync *pParms);

static void
			checkSegmentAlive(CdbDispatchCmdAsync *pParms);

static void
			handlePollError(CdbDispatchCmdAsync *pParms);

static void
			handlePollSuccess(CdbDispatchCmdAsync *pParms, WaitEvent *revents, int nready);

static bool
			checkAckMessage(CdbDispatchResult *dispatchResult, const char *message);

/*
 * Check dispatch result.
 * Don't wait all dispatch commands to complete.
 *
 * Return true if any connection received error.
 */
static bool
cdbdisp_checkForCancel_async(struct CdbDispatcherState *ds)
{
	Assert(ds);

	checkDispatchResult(ds, DISPATCH_NO_WAIT);
	return cdbdisp_checkResultsErrcode(ds->primaryResults);
}

/*
 * Return all FDs to wait for, after dispatching.
 *
 * nsocks is the returned socket fds number (as an output param):
 *
 * Return value is the array of waiting socket fds.
 * It's be palloced in this function, so caller need to pfree it.
 */
static int *
cdbdisp_getWaitSocketFds_async(struct CdbDispatcherState *ds, int *nsocks)
{
	CdbDispatchCmdAsync *pParms = (CdbDispatchCmdAsync *) ds->dispatchParams;
	int			i;
	int			*fds = NULL;

	Assert(ds);

	*nsocks = 0;
	if (proc_exit_inprogress)
		return NULL;

	fds = palloc(pParms->dispatchCount * sizeof(int));

	/*
	 * This should match the logic in cdbdisp_checkForCancel_async(). In
	 * particular, when cdbdisp_checkForCancel_async() is called, it must
	 * process any incoming data from the socket we return here, or we
	 * will busy wait.
	 */
	for (i = 0; i < pParms->dispatchCount; i++)
	{
		CdbDispatchResult *dispatchResult;
		SegmentDatabaseDescriptor *segdbDesc;

		dispatchResult = pParms->dispatchResultPtrArray[i];
		segdbDesc = dispatchResult->segdbDesc;

		Assert(!cdbconn_isBadConnection(segdbDesc));

		(fds)[(*nsocks)++] = PQsocket(segdbDesc->conn);
	}

	return fds;
}

/*
 * Block until all data are dispatched.
 */
static void
cdbdisp_waitDispatchFinish_async(struct CdbDispatcherState *ds)
{
	CdbDispatchCmdAsync *pParms = (CdbDispatchCmdAsync *) ds->dispatchParams;
	int				dispatchCount = pParms->dispatchCount;
	WaitEventSet 	*volatile waitset = CreateWaitEventSet(CurrentMemoryContext, dispatchCount);

	/* Use PG_TRY() - PG_CATCH() to make sure destroy the waiteventset (close the epoll fd) */
	PG_TRY();
	{
		cdbdisp_waitDispatchFinishLoop_async(ds, waitset);
	}
	PG_CATCH();
	{
		FreeWaitEventSet(waitset);
		PG_RE_THROW();
	}
	PG_END_TRY();

	FreeWaitEventSet(waitset);
}

static void
cdbdisp_waitDispatchFinishLoop_async(struct CdbDispatcherState *ds, WaitEventSet *waitset)
{
	const static int DISPATCH_POLL_TIMEOUT = 500;
	int			i;
	CdbDispatchCmdAsync *pParms = (CdbDispatchCmdAsync *) ds->dispatchParams;
	int			dispatchCount = pParms->dispatchCount;

	WaitEvent 		*revents = palloc(sizeof(WaitEvent) * dispatchCount);
	int 			*added = palloc0(sizeof(int) * dispatchCount);
	while (true)
	{
		int			pollRet;
		bool		allFlushed = true;

		for (i = 0; i < dispatchCount; i++)
		{
			CdbDispatchResult *qeResult = pParms->dispatchResultPtrArray[i];
			SegmentDatabaseDescriptor *segdbDesc = qeResult->segdbDesc;
			PGconn	   *conn = segdbDesc->conn;
			int			ret = 0;

			/* skip already completed connections */
			if (conn->outCount == 0)
				continue;

			/*
			 * call send for this connection regardless of its POLLOUT status,
			 * because it may be writable NOW
			 */
			ret = pqFlushNonBlocking(conn);

			if (ret == 0)
				continue;
			else if (ret > 0)
			{
				/* add segment sock to the waitset */
				if (!added[i])
				{
					int 	sock = PQsocket(conn);
					Assert(sock >= 0);
					AddWaitEventToSet(waitset, WL_SOCKET_WRITEABLE, sock, NULL, NULL);
					added[i] = 1;
				}

				allFlushed = false;
			}
			else if (ret < 0)
			{
				/* error message should be set up already */
				char	   *msg = PQerrorMessage(conn);

				qeResult->stillRunning = false;
				ereport(ERROR,
						(errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
						errmsg("Command could not be dispatch to segment %s: %s", qeResult->segdbDesc->whoami, msg ? msg : "unknown error")));
			}
		}

		if (allFlushed)
			break;

		/* guarantee poll() is interruptible */
		do
		{
			CHECK_FOR_INTERRUPTS();

			pollRet = WaitEventSetWait(waitset, DISPATCH_POLL_TIMEOUT, revents, dispatchCount, WAIT_EVENT_DISP_FINISH);
			Assert(pollRet >= 0);
			if (pollRet == 0)
				ELOG_DISPATCHER_DEBUG("cdbdisp_waitDispatchFinish_async(): Dispatch poll timeout after %d ms", DISPATCH_POLL_TIMEOUT);
		}
		while (pollRet == 0);
	}
	pfree(revents);
	pfree(added);
}

/*
 * Dispatch command to gang.
 *
 * Throw out error to upper try-catch block if anything goes wrong. This function only kicks off dispatching,
 * call cdbdisp_waitDispatchFinish_async to ensure the completion
 */
static void
cdbdisp_dispatchToGang_async(struct CdbDispatcherState *ds,
							 struct Gang *gp,
							 int sliceIndex)
{
	int			i;

	CdbDispatchCmdAsync *pParms = (CdbDispatchCmdAsync *) ds->dispatchParams;

	/*
	 * Start the dispatching
	 */
	for (i = 0; i < gp->size; i++)
	{
		CdbDispatchResult *qeResult;

		SegmentDatabaseDescriptor *segdbDesc = gp->db_descriptors[i];

		Assert(segdbDesc != NULL);

		/*
		 * Initialize the QE's CdbDispatchResult object.
		 */
		qeResult = cdbdisp_makeResult(ds->primaryResults, segdbDesc, sliceIndex);
		if (qeResult == NULL)
		{
			elog(FATAL, "could not allocate resources for segworker communication");
		}
		pParms->dispatchResultPtrArray[pParms->dispatchCount++] = qeResult;

		dispatchCommand(qeResult, pParms->query_text, pParms->query_text_len);
	}
}

/*
 * Check the specified acknowledge messages from QEs.
 *
 * Check all dispatch connections to get expected acknowledge message.
 * Return true if all required QEs' acknowledge messages have been received.
 *
 * message: specifies the expected ACK message to check.
 * timeout_sec: the second that the dispatcher waits for the ack messages at most.
 *              0 means checking immediately, and -1 means waiting until all ack
 *              messages are received.
 */
static bool
cdbdisp_checkAckMessage_async(struct CdbDispatcherState *ds, const char *message,
							  int timeout_sec)
{
	DispatchWaitMode prevWaitMode;
	CdbDispatchCmdAsync *pParms;
	bool receivedAll = true;

	Assert(ds);

	pParms = (CdbDispatchCmdAsync *) ds->dispatchParams;
	/* If cdbdisp_destroyDispatcherState is called */
	if (pParms == NULL || message == NULL)
		return false;

	pParms->ackMessage = message;
	prevWaitMode = pParms->waitMode;
	pParms->waitMode = DISPATCH_WAIT_ACK_ROOT;

	for (int i = 0; i < pParms->dispatchCount; i++)
		pParms->dispatchResultPtrArray[i]->receivedAckMsg = false;

	checkDispatchResult(ds, timeout_sec);

	for (int i = 0; i < pParms->dispatchCount; i++)
	{
		if (!pParms->dispatchResultPtrArray[i]->receivedAckMsg &&
			pParms->dispatchResultPtrArray[i]->stillRunning)
		{
			receivedAll = false;
			break;
		}
	}

	pParms->waitMode = prevWaitMode;
	pParms->ackMessage = NULL;

	return receivedAll;
}

/*
 * Check dispatch result.
 *
 * Wait all dispatch work to complete, either success or fail.
 * (Set stillRunning to true when one dispatch work is completed)
 */
static void
cdbdisp_checkDispatchResult_async(struct CdbDispatcherState *ds,
								  DispatchWaitMode waitMode)
{
	Assert(ds != NULL);
	CdbDispatchCmdAsync *pParms = (CdbDispatchCmdAsync *) ds->dispatchParams;

	/* cdbdisp_destroyDispatcherState is called */
	if (pParms == NULL)
		return;

	/*
	 * Don't overwrite DISPATCH_WAIT_CANCEL or DISPATCH_WAIT_FINISH with
	 * DISPATCH_WAIT_NONE
	 */
	if (waitMode != DISPATCH_WAIT_NONE)
		pParms->waitMode = waitMode;

	checkDispatchResult(ds, DISPATCH_WAIT_UNTIL_FINISH);
}

/*
 * Allocates memory for a CdbDispatchCmdAsync structure and do the initialization.
 *
 * Memory will be freed in function cdbdisp_destroyDispatcherState by deleting the
 * memory context.
 */
static void *
cdbdisp_makeDispatchParams_async(int maxSlices, int largestGangSize, char *queryText, int len)
{
	int			maxResults = maxSlices * largestGangSize;
	int			size = 0;

	CdbDispatchCmdAsync *pParms = palloc0(sizeof(CdbDispatchCmdAsync));

	size = maxResults * sizeof(CdbDispatchResult *);
	pParms->dispatchResultPtrArray = (CdbDispatchResult **) palloc0(size);
	pParms->dispatchCount = 0;
	pParms->waitMode = DISPATCH_WAIT_NONE;
	pParms->ackMessage = NULL;
	pParms->query_text = queryText;
	pParms->query_text_len = len;

	return (void *) pParms;
}

/*
 * Receive and process results from all running QEs.
 * timeout_sec: the second that the dispatcher waits for the ack messages at most.
 *              DISPATCH_NO_WAIT(0): return immediate when there's no more data.
 *              DISPATCH_WAIT_UNTIL_FINISH(-1): wait until all dispatch works are completed.
 */
static void
checkDispatchResult(CdbDispatcherState *ds, int timeout_sec)
{
	CdbDispatchCmdAsync *pParms = (CdbDispatchCmdAsync *) ds->dispatchParams;
	WaitEventSet *volatile waitset = CreateWaitEventSet(CurrentMemoryContext, pParms->dispatchCount);

	/* Use PG_TRY() - PG_CATCH() to make sure destroy the waiteventset (close the epoll fd) */
	PG_TRY();
	{
		checkDispatchResultLoop(ds, timeout_sec, waitset);
	}
	PG_CATCH();
	{
		FreeWaitEventSet(waitset);
		PG_RE_THROW();
	}
	PG_END_TRY();

	FreeWaitEventSet(waitset);
}

static void
checkDispatchResultLoop(CdbDispatcherState *ds, int timeout_sec, WaitEventSet *waitset)
{
	int			i;
	int			timeout = 0;
	bool		sentSignal = false;
	uint8 ftsVersion = 0;
	struct timeval start_ts, now;
	int64		diff_us;

	CdbDispatchCmdAsync *pParms = (CdbDispatchCmdAsync *) ds->dispatchParams;
	CdbDispatchResults *meleeResults = ds->primaryResults;
	SegmentDatabaseDescriptor *segdbDesc;
	CdbDispatchResult *dispatchResult;

	int 	db_count = pParms->dispatchCount;
	int 	*added = palloc0(db_count * sizeof(int));
	WaitEvent *revents = palloc(sizeof(WaitEvent) * db_count);

	/*
	 * OK, we are finished submitting the command to the segdbs. Now, we have
	 * to wait for them to finish.
	 */
	gettimeofday(&start_ts, NULL);
	for (;;)
	{
		int			n;
		int			nfds = 0;
		int			ack_count = 0;
		PGconn		*conn;

		/*
		 * bail-out if we are dying. Once QD dies, QE will recognize it
		 * shortly anyway.
		 */
		if (proc_exit_inprogress)
			break;

		/*
		 * Current loop might last for the long time so check on interrupts.
		 */
		CHECK_FOR_INTERRUPTS();

		/*
		 * escalate waitMode to cancel if:
		 * - user cancel request has occurred,
		 * - or an error has been reported by any QE,
		 * - in case the caller wants cancelOnError
		 */
		if ((CancelRequested() || meleeResults->errcode) && meleeResults->cancelOnError)
			pParms->waitMode = DISPATCH_WAIT_CANCEL;

		/*
		 * Which QEs are still running and could send results to us?
		 */
		for (i = 0; i < db_count; i++)
		{
			dispatchResult = pParms->dispatchResultPtrArray[i];
			segdbDesc = dispatchResult->segdbDesc;
			conn = segdbDesc->conn;

			if (pParms->waitMode == DISPATCH_WAIT_ACK_ROOT &&
				checkAckMessage(dispatchResult, pParms->ackMessage))
			{
				ack_count++;
				continue;
			}

			/*
			 * Already finished with this QE?
			 */
			if (!dispatchResult->stillRunning)
				continue;

			Assert(!cdbconn_isBadConnection(segdbDesc));

			/*
			 * Flush out buffer in case some commands are not fully
			 * dispatched to QEs, this can prevent QD from polling
			 * on such QEs forever.
			 */
			if (conn->outCount > 0)
			{
				/*
				 * Don't error out here, let following poll() routine to
				 * handle it.
				 */
				if (pqFlush(conn) < 0)
					elog(LOG, "Failed flushing outbound data to %s:%s",
						 segdbDesc->whoami, PQerrorMessage(conn));
			}

			/* add segment sock to the waitset */
			if (!added[i])
			{
				int 	sock = PQsocket(conn);
				long 	ev_userdata = i; /* the index "i" as the event's userdata */
				Assert(sock >= 0);
				AddWaitEventToSet(waitset, WL_SOCKET_READABLE, sock, NULL, (void *)ev_userdata);
				added[i] = 1;
			}
			nfds++;
		}

		/*
		 * Break out when no QEs still running or required QEs acked.
		 */
		if (nfds <= 0 ||
			(pParms->waitMode == DISPATCH_WAIT_ACK_ROOT && ack_count == ds->rootGangSize))
			break;

		/*
		 * Wait for results from QEs
		 *
		 * Don't wait if: - this is called from interconnect to check if
		 * there's any error.
		 *
		 * Lower the timeout if: - we need send signal to QEs.
		 */
		if (timeout_sec == 0)
			timeout = 0;
		else if (pParms->waitMode == DISPATCH_WAIT_NONE ||
				 pParms->waitMode == DISPATCH_WAIT_ACK_ROOT ||
				 sentSignal)
			timeout = DISPATCH_WAIT_TIMEOUT_MSEC;
		else
			timeout = DISPATCH_WAIT_CANCEL_TIMEOUT_MSEC;

		n = WaitEventSetWait(waitset, timeout, revents, db_count, WAIT_EVENT_DISP_RESULT);

		/*
		 * poll returns with an error, including one due to an interrupted
		 * call
		 */
		if (n < 0)
		{
			int			sock_errno = SOCK_ERRNO;

			if (sock_errno == EINTR)
				continue;

			elog(LOG, "handlePollError poll() failed; errno=%d", sock_errno);

			handlePollError(pParms);

			/*
			 * Since an error was detected for the segment, request
			 * FTS to perform a probe before checking the segment
			 * state.
			 */
			FtsNotifyProber();
			checkSegmentAlive(pParms);

			if (pParms->waitMode != DISPATCH_WAIT_NONE &&
				pParms->waitMode != DISPATCH_WAIT_ACK_ROOT)
			{
				signalQEs(pParms);
				sentSignal = true;
			}

			gettimeofday(&now, NULL);
			diff_us = (now.tv_sec - start_ts.tv_sec) * 1000000;
			diff_us += (int) now.tv_usec - (int) start_ts.tv_usec;
			if (timeout_sec >= 0 && diff_us >= timeout_sec * 1000000L)
				break;
		}
		/* If the time limit expires, poll() returns 0 */
		else if (n == 0)
		{
			if (pParms->waitMode != DISPATCH_WAIT_NONE &&
				pParms->waitMode != DISPATCH_WAIT_ACK_ROOT)
			{
				signalQEs(pParms);
				sentSignal = true;
			}

			/*
			 * This code relies on FTS being triggered at regular
			 * intervals. Iff FTS detects change in configuration
			 * then check segment state. FTS probe is not triggered
			 * explicitly in this case because this happens every
			 * DISPATCH_WAIT_TIMEOUT_MSEC.
			 */
			if (ftsVersion == 0 || ftsVersion != getFtsVersion())
			{
				ftsVersion = getFtsVersion();
				checkSegmentAlive(pParms);
			}

			gettimeofday(&now, NULL);
			diff_us = (now.tv_sec - start_ts.tv_sec) * 1000000;
			diff_us += (int) now.tv_usec - (int) start_ts.tv_usec;
			if (timeout_sec >= 0 && diff_us >= timeout_sec * 1000000L)
				break;
		}
		/* We have data waiting on one or more of the connections. */
		else
			handlePollSuccess(pParms, revents, n);
	} /* for (;;) */

	pfree(revents);
	pfree(added);
}

/*
 * Helper function that actually kicks off the command on the libpq connection.
 */
static void
dispatchCommand(CdbDispatchResult *dispatchResult,
				const char *query_text,
				int query_text_len)
{
	TimestampTz beforeSend = 0;
	long		secs;
	int			usecs;

	if (DEBUG1 >= log_min_messages)
		beforeSend = GetCurrentTimestamp();

	/*
	 * Submit the command asynchronously.
	 */
	if (PQsendGpQuery_shared(dispatchResult->segdbDesc->conn, (char *) query_text, query_text_len, true) == 0)
	{
		char	   *msg = PQerrorMessage(dispatchResult->segdbDesc->conn);

		dispatchResult->stillRunning = false;
		ereport(ERROR,
				(errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
				 errmsg("Command could not be dispatch to segment %s: %s",
						dispatchResult->segdbDesc->whoami, msg ? msg : "unknown error")));
	}

	forwardQENotices();

	if (DEBUG1 >= log_min_messages)
	{
		TimestampDifference(beforeSend, GetCurrentTimestamp(), &secs, &usecs);

		if (secs != 0 || usecs > 1000)	/* Time > 1ms? */
			elog(LOG, "time for PQsendGpQuery_shared %ld.%06d", secs, usecs);
	}

	/*
	 * We'll keep monitoring this QE -- whether or not the command was
	 * dispatched -- in order to check for a lost connection or any other
	 * errors that libpq might have in store for us.
	 */
	dispatchResult->stillRunning = true;
	dispatchResult->hasDispatched = true;

	ELOG_DISPATCHER_DEBUG("Command dispatched to QE (%s)", dispatchResult->segdbDesc->whoami);
}

/*
 * Helper function to check whether specified acknowledge message has been
 * received.
 *
 * Check whether the current required acknowledge message is already received
 * in the ackPGNotifies queue.
 */
static bool
checkAckMessage(CdbDispatchResult *dispatchResult, const char *message)
{
	bool received = false;
	PGnotify* ackNotifies = (PGnotify *) dispatchResult->ackPGNotifies;

	if (!message)
		elog(ERROR, "Notify ACK message is required.");

	if (dispatchResult->receivedAckMsg)
		return true;

	while (ackNotifies)
	{
		if (strcmp(ackNotifies->extra, message) == 0)
		{
			received = true;
			dispatchResult->receivedAckMsg = true;
			break;
		}
		ackNotifies = ackNotifies->next;
	}
	return received;
}

/*
 * Helper function to checkDispatchResult that handles errors that occur
 * during the poll() call.
 *
 * NOTE: The cleanup of the connections will be performed by handlePollTimeout().
 */
static void
handlePollError(CdbDispatchCmdAsync *pParms)
{
	int			i;

	for (i = 0; i < pParms->dispatchCount; i++)
	{
		CdbDispatchResult *dispatchResult = pParms->dispatchResultPtrArray[i];
		SegmentDatabaseDescriptor *segdbDesc = dispatchResult->segdbDesc;

		/* Skip if already finished or didn't dispatch. */
		if (!dispatchResult->stillRunning)
			continue;

		if (pParms->waitMode == DISPATCH_WAIT_ACK_ROOT &&
				 dispatchResult->receivedAckMsg)
			continue;

		/* We're done with this QE, sadly. */
		if (PQstatus(segdbDesc->conn) == CONNECTION_BAD)
		{
			char	   *msg = PQerrorMessage(segdbDesc->conn);

			if (msg)
				elog(LOG, "Dispatcher encountered connection error on %s: %s", segdbDesc->whoami, msg);

			elog(LOG, "Dispatcher noticed bad connection in handlePollError()");

			/* Save error info for later. */
			cdbdisp_appendMessageNonThread(dispatchResult, LOG,
										   "Error after dispatch from %s: %s",
										   segdbDesc->whoami,
										   msg ? msg : "unknown error");

			PQfinish(segdbDesc->conn);
			segdbDesc->conn = NULL;
			dispatchResult->stillRunning = false;
		}
	}
	forwardQENotices();

	return;
}

/*
 * Receive and process results from QEs.
 */
static void
handlePollSuccess(CdbDispatchCmdAsync *pParms,
				  WaitEvent *revents, int nready)
{
	int			i = 0;

	/*
	 * We have data waiting on one or more of the connections.
	 */
	for (i = 0; i < nready; i++)
	{
		long pos = (long)(revents[i].user_data); /* original position */
		bool		finished;
		int			sock;
		CdbDispatchResult *dispatchResult = pParms->dispatchResultPtrArray[pos];
		SegmentDatabaseDescriptor *segdbDesc = dispatchResult->segdbDesc;

		/*
		 * Skip if already finished or didn't dispatch.
		 */
		if (!dispatchResult->stillRunning)
			continue;

		if (pParms->waitMode == DISPATCH_WAIT_ACK_ROOT &&
				 dispatchResult->receivedAckMsg)
			continue;

		ELOG_DISPATCHER_DEBUG("looking for results from %ld of %d (%s)",
							  pos + 1, pParms->dispatchCount, segdbDesc->whoami);

		sock = PQsocket(segdbDesc->conn);
		Assert(sock >= 0);
		Assert(sock == revents[i].fd);

		/*
		 * Skip this connection if it has no input available.
		if (!(fds[currentFdNumber++].revents & POLLIN))
			continue;
		 */

		ELOG_DISPATCHER_DEBUG("PQsocket says there are results from %ld of %d (%s)",
							  pos + 1, pParms->dispatchCount, segdbDesc->whoami);

		/*
		 * Receive and process results from this QE.
		 */
		finished = processResults(dispatchResult);

		/*
		 * Are we through with this QE now?
		 */
		if (finished)
		{
			dispatchResult->stillRunning = false;

			ELOG_DISPATCHER_DEBUG("processResults says we are finished with %ld of %d (%s)",
								  pos + 1, pParms->dispatchCount, segdbDesc->whoami);

			if (DEBUG1 >= log_min_messages)
			{
				char		msec_str[32];

				switch (check_log_duration(msec_str, false))
				{
					case 1:
					case 2:
						elog(LOG, "duration to dispatch result received from %ld (seg %d): %s ms",
							 pos + 1, dispatchResult->segdbDesc->segindex, msec_str);
						break;
				}
			}

			if (PQisBusy(dispatchResult->segdbDesc->conn))
				elog(DEBUG1, "did not receive query results on libpq connection %s",
					 dispatchResult->segdbDesc->whoami);
		}
		else
			ELOG_DISPATCHER_DEBUG("processResults says we have more to do with %ld of %d (%s)",
								  pos + 1, pParms->dispatchCount, segdbDesc->whoami);
	}
}

/*
 * Send finish or cancel signal to QEs if needed.
 */
static void
signalQEs(CdbDispatchCmdAsync *pParms)
{
	int			i;
	DispatchWaitMode waitMode = pParms->waitMode;

	for (i = 0; i < pParms->dispatchCount; i++)
	{
		char		errbuf[256];
		bool		sent = false;
		CdbDispatchResult *dispatchResult = pParms->dispatchResultPtrArray[i];

		Assert(dispatchResult != NULL);
		SegmentDatabaseDescriptor *segdbDesc = dispatchResult->segdbDesc;

		/*
		 * Don't send the signal if - QE is finished or canceled - the signal
		 * was already sent - connection is dead
		 */

		if (!dispatchResult->stillRunning ||
			dispatchResult->wasCanceled ||
			(pParms->waitMode == DISPATCH_WAIT_ACK_ROOT &&
			 dispatchResult->receivedAckMsg) ||
			cdbconn_isBadConnection(segdbDesc))
			continue;

		memset(errbuf, 0, sizeof(errbuf));

		sent = cdbconn_signalQE(segdbDesc, errbuf, waitMode == DISPATCH_WAIT_CANCEL);
		if (sent)
			dispatchResult->sentSignal = waitMode;
		else
			elog(LOG, "Unable to cancel: %s",
				 strlen(errbuf) == 0 ? "cannot allocate PGCancel" : errbuf);
	}
}

/*
 * Check if any segment DB down is detected by FTS.
 */
static void
checkSegmentAlive(CdbDispatchCmdAsync *pParms)
{
	int			i;

	/*
	 * check the connection still valid
	 */
	for (i = 0; i < pParms->dispatchCount; i++)
	{
		CdbDispatchResult *dispatchResult = pParms->dispatchResultPtrArray[i];
		SegmentDatabaseDescriptor *segdbDesc = dispatchResult->segdbDesc;

		/*
		 * Skip if already finished or didn't dispatch.
		 */
		if (!dispatchResult->stillRunning)
			continue;

		/*
		 * Skip the entry db.
		 */
		if (segdbDesc->segindex < 0)
			continue;

		ELOG_DISPATCHER_DEBUG("FTS testing connection %d of %d (%s)",
							  i + 1, pParms->dispatchCount, segdbDesc->whoami);

		if (FtsIsSegmentDown(segdbDesc->segment_database_info))
		{
			char	   *msg = PQerrorMessage(segdbDesc->conn);

			dispatchResult->stillRunning = false;
			cdbdisp_appendMessageNonThread(dispatchResult, LOG,
										   "FTS detected connection lost during dispatch to %s: %s",
										   dispatchResult->segdbDesc->whoami, msg ? msg : "unknown error");

			/*
			 * Not a good idea to store into the PGconn object. Instead, just
			 * close it.
			 */
			PQfinish(segdbDesc->conn);
			segdbDesc->conn = NULL;
		}
	}
}

static inline void
send_sequence_response(PGconn *conn, Oid oid, int64 last, int64 cached, int64 increment, bool overflow, bool error)
{
	if (pqPutMsgStart(SEQ_NEXTVAL_QUERY_RESPONSE, false, conn) < 0)
		elog(ERROR, "Failed to send sequence response: %s", PQerrorMessage(conn));
	pqPutInt(oid, 4, conn);
	pqPutInt(last >> 32, 4, conn);
	pqPutInt(last, 4, conn);
	pqPutInt(cached >> 32, 4, conn);
	pqPutInt(cached, 4, conn);
	pqPutInt(increment >> 32, 4, conn);
	pqPutInt(increment, 4, conn);
	pqPutc(overflow ? SEQ_NEXTVAL_TRUE : SEQ_NEXTVAL_FALSE, conn);
	pqPutc(error ? SEQ_NEXTVAL_TRUE : SEQ_NEXTVAL_FALSE, conn);
	if (pqPutMsgEnd(conn) < 0)
		elog(ERROR, "Failed to send sequence response: %s", PQerrorMessage(conn));
	if (pqFlush(conn) < 0)
		elog(ERROR, "Failed to send sequence response: %s", PQerrorMessage(conn));
}

/*
 * Receive and process input from one QE.
 *
 * Return true if all input are consumed or the connection went wrong.
 * Return false if there'er still more data expected.
 */
static bool
processResults(CdbDispatchResult *dispatchResult)
{
	SegmentDatabaseDescriptor *segdbDesc = dispatchResult->segdbDesc;
	char	   *msg;

	/*
	 * Receive input from QE.
	 */
	if (PQconsumeInput(segdbDesc->conn) == 0)
	{
		msg = PQerrorMessage(segdbDesc->conn);
		cdbdisp_appendMessageNonThread(dispatchResult, LOG,
									   "Error on receive from %s: %s",
									   segdbDesc->whoami, msg ? msg : "unknown error");
		return true;
	}
	forwardQENotices();

	/*
	 * If we have received one or more complete messages, process them.
	 */
	while (!PQisBusy(segdbDesc->conn))
	{
		/* loop to call PQgetResult; won't block */
		PGresult   *pRes;
		ExecStatusType resultStatus;
		int			resultIndex;

		forwardQENotices();

		/*
		 * PQisBusy() does some error handling, which can cause the connection
		 * to die -- we can't just continue on as if the connection is happy
		 * without checking first.
		 *
		 * For example, cdbdisp_numPGresult() will return a completely bogus
		 * value!
		 */
		if (cdbconn_isBadConnection(segdbDesc))
		{
			msg = PQerrorMessage(segdbDesc->conn);
			cdbdisp_appendMessageNonThread(dispatchResult, LOG,
										   "Connection lost when receiving from %s: %s",
										   segdbDesc->whoami, msg ? msg : "unknown error");
			return true;
		}

		/*
		 * Get one message.
		 */
		ELOG_DISPATCHER_DEBUG("PQgetResult");
		pRes = PQgetResult(segdbDesc->conn);

		/*
		 * Command is complete when PGgetResult() returns NULL. It is critical
		 * that for any connection that had an asynchronous command sent thru
		 * it, we call PQgetResult until it returns NULL. Otherwise, the next
		 * time a command is sent to that connection, it will return an error
		 * that there's a command pending.
		 */
		if (!pRes)
		{
			ELOG_DISPATCHER_DEBUG("%s -> idle", segdbDesc->whoami);
			/* this is normal end of command */
			return true;
		}

		if (segdbDesc->conn->wrote_xlog)
		{
			MarkTopTransactionWriteXLogOnExecutor();

			/*
			 * Reset the worte_xlog here. Since if the received pgresult not process
			 * the xlog write message('x' message sends from QE in ReadyForQuery),
			 * the value may still refer to previous dispatch statement. Which may
			 * always mark current top transaction has wrote xlog on executor.
			 */
			segdbDesc->conn->wrote_xlog = false;
		}

		/*
		 * Attach the PGresult object to the CdbDispatchResult object.
		 */
		resultIndex = cdbdisp_numPGresult(dispatchResult);
		cdbdisp_appendResult(dispatchResult, pRes);

		/*
		 * Did a command complete successfully?
		 */
		resultStatus = PQresultStatus(pRes);
		if (resultStatus == PGRES_COMMAND_OK ||
			resultStatus == PGRES_TUPLES_OK ||
			resultStatus == PGRES_COPY_IN ||
			resultStatus == PGRES_COPY_OUT ||
			resultStatus == PGRES_EMPTY_QUERY)
		{
			ELOG_DISPATCHER_DEBUG("%s -> ok %s",
								  segdbDesc->whoami,
								  PQcmdStatus(pRes) ? PQcmdStatus(pRes) : "(no cmdStatus)");

			if (resultStatus == PGRES_EMPTY_QUERY)
				ELOG_DISPATCHER_DEBUG("QE received empty query.");

			/*
			 * Save the index of the last successful PGresult. Can be given to
			 * cdbdisp_getPGresult() to get tuple count, etc.
			 */
			dispatchResult->okindex = resultIndex;

			/*
			 * SREH - get number of rows rejected from QE if any
			 */
			if (pRes->numRejected > 0)
				dispatchResult->numrowsrejected += pRes->numRejected;

			/*
			 * COPY FROM - get the number of rows completed by QE
			 * if any
			 */
			if (pRes->numCompleted > 0)
				dispatchResult->numrowscompleted += pRes->numCompleted;

			if (resultStatus == PGRES_COPY_IN ||
				resultStatus == PGRES_COPY_OUT)
				return true;
		}

		/*
		 * Note QE error. Cancel the whole statement if requested.
		 */
		else
		{
			/* QE reported an error */
			char	   *sqlstate = PQresultErrorField(pRes, PG_DIAG_SQLSTATE);
			int			errcode = 0;

			msg = PQresultErrorMessage(pRes);

			ELOG_DISPATCHER_DEBUG("%s -> %s %s  %s",
								  segdbDesc->whoami,
								  PQresStatus(resultStatus),
								  sqlstate ? sqlstate : "(no SQLSTATE)",
								  msg);

			/*
			 * Convert SQLSTATE to an error code (ERRCODE_xxx). Use a generic
			 * nonzero error code if no SQLSTATE.
			 */
			if (sqlstate && strlen(sqlstate) == 5)
				errcode = sqlstate_to_errcode(sqlstate);

			/*
			 * Save first error code and the index of its PGresult buffer
			 * entry.
			 */
			cdbdisp_seterrcode(errcode, resultIndex, dispatchResult);
		}
	}

	forwardQENotices();

	PGnotify *qnotifies = PQnotifies(segdbDesc->conn);
	while(qnotifies && elog_geterrcode() == 0)
	{
		if (strcmp(qnotifies->relname, CDB_NOTIFY_NEXTVAL) == 0)
		{
			/*
			 * If there was nextval request then respond back on this libpq
			 * connection with the next value. Check and process nextval
			 * message only if QD has not already hit the error. Since QD could
			 * have hit the error while processing the previous nextval_qd()
			 * request itself and since full error handling is not complete yet
			 * (ex: releasing all the locks, etc.), shouldn't attempt to call
			 * nextval_qd() again.
			 */

			CHECK_FOR_INTERRUPTS();

			int64 last;
			int64 cached;
			int64 increment;
			bool overflow;
			Oid dbid;
			Oid seq_oid;

			if (sscanf(qnotifies->extra, "%u:%u", &dbid, &seq_oid) != 2)
				elog(ERROR, "invalid nextval message");

			if (dbid != MyDatabaseId)
				elog(ERROR, "nextval message database id:%u doesn't match my database id:%u",
					 dbid, MyDatabaseId);

			PG_TRY();
			{
				nextval_qd(seq_oid, &last, &cached, &increment, &overflow);
			}
			PG_CATCH();
			{
				send_sequence_response(segdbDesc->conn, seq_oid, last, cached, increment, overflow, true /* error */);
				PG_RE_THROW();
			}
			PG_END_TRY();
			/* respond back on this libpq connection with the next value */
			send_sequence_response(segdbDesc->conn, seq_oid, last, cached, increment, overflow, false /* error */);
		}
		else if (strcmp(qnotifies->relname, CDB_NOTIFY_ENDPOINT_ACK) == 0)
		{
			qnotifies->next = (struct pgNotify *) dispatchResult->ackPGNotifies;
			dispatchResult->ackPGNotifies = (struct PGnotify *) qnotifies;

			/* Don't free the notify here since it in queue now */
			qnotifies = NULL;
		}
		else
		{
			/* Got an unknown PGnotify, just record it in log */
			if (qnotifies->relname)
				elog(LOG, "got an unknown notify message : %s", qnotifies->relname);
		}

		if (qnotifies)
			PQfreemem(qnotifies);
		qnotifies = PQnotifies(segdbDesc->conn);
	}

	forwardQENotices();

	return false;				/* we must keep on monitoring this socket */
}

相关信息

greenplumn 源码目录

相关文章

greenplumn cdbconn 源码

greenplumn cdbdisp 源码

greenplumn cdbdisp_dtx 源码

greenplumn cdbdisp_query 源码

greenplumn cdbdispatchresult 源码

greenplumn cdbgang 源码

greenplumn cdbgang_async 源码

greenplumn cdbpq 源码

0  赞