greenplumn cdbgang_async 源码

  • 2022-08-18
  • 浏览 (732)

greenplumn cdbgang_async 代码

文件路径:/src/backend/cdb/dispatcher/cdbgang_async.c

/*-------------------------------------------------------------------------
 *
 * cdbgang_async.c
 *	  Functions for asynchronous implementation of creating gang.
 *
 * Portions Copyright (c) 2005-2008, Greenplum inc
 * Portions Copyright (c) 2012-Present VMware, Inc. or its affiliates.
 *
 *
 * IDENTIFICATION
 *	    src/backend/cdb/dispatcher/cdbgang_async.c
 *
 *-------------------------------------------------------------------------
 */

#include "postgres.h"

#ifdef HAVE_POLL_H
#include <poll.h>
#endif
#ifdef HAVE_SYS_POLL_H
#include <sys/poll.h>
#endif

#include "access/xact.h"
#include "storage/ipc.h"		/* For proc_exit_inprogress  */
#include "pgstat.h"
#include "tcop/tcopprot.h"
#include "libpq-fe.h"
#include "libpq-int.h"
#include "cdb/cdbfts.h"
#include "cdb/cdbgang.h"
#include "cdb/cdbgang_async.h"
#include "cdb/cdbtm.h"
#include "cdb/cdbvars.h"
#include "miscadmin.h"

static int	getPollTimeout(const struct timeval *startTS);

/*
 * Creates a new gang by logging on a session to each segDB involved.
 *
 * call this function in GangContext memory context.
 * elog ERROR or return a non-NULL gang.
 */
Gang *
cdbgang_createGang_async(List *segments, SegmentType segmentType)
{
	PostgresPollingStatusType	*pollingStatus = NULL;
	SegmentDatabaseDescriptor	*segdbDesc = NULL;
	struct timeval	startTS;
	Gang	*newGangDefinition;
	int		create_gang_retry_counter = 0;
	int		in_recovery_mode_count = 0;
	int		successful_connections = 0;
	int		poll_timeout = 0;
	int		i = 0;
	int		size = 0;
	int		totalSegs = 0;
	bool	allStatusDone = true;
	bool	retry = false;

	WaitEventSet    *volatile gang_waitset = NULL;
	/* the returned events of waiteventset */
	WaitEvent		*revents = NULL;
	/* true means connection status is confirmed, either established or in recovery mode */
	bool		*connStatusDone = NULL;

	size = list_length(segments);

	ELOG_DISPATCHER_DEBUG("createGang size = %d, segment type = %d", size, segmentType);
	Assert(CurrentGangCreating == NULL);

	/* If we're in a retry, we may need to reset our initial state, a bit */
	newGangDefinition = NULL;
	/* allocate and initialize a gang structure */
	newGangDefinition = buildGangDefinition(segments, segmentType);
	CurrentGangCreating = newGangDefinition;
	/*
	 * If we're in a global transaction, and there is some segment configuration change,
	 * we have to error out so that the current global transaction can be aborted.
	 * This is because within a transaction we use cached version of configuration information 
	 * obtained at start of transaction, which we can't update in-middle of transaction.
	 * so QD will still talk to the old primary but not a new promoted one. This isn't an issue 
	 * if the old primary is completely down since we'll find a FATAL error during communication,
	 * but becomes an issue if the old primary is working and acting like normal to QD.
	 * 
	 * Before error out, we need to reset the session instead of disconnectAndDestroyAllGangs.
	 * The latter will drop CdbComponentsContext what we will use in AtAbort_Portals.
	 * Because some primary segment is down writerGangLost will be marked when recycling gangs,
	 * All Gangs will be destroyed by ResetAllGangs in AtAbort_DispatcherState.
	 *
	 * We shouldn't error out in transaction abort state to avoid recursive abort.
	 * In such case, the dispatcher would catch the error and then dtm does (retry)
	 * abort.
	 */
	if (IsTransactionState())
	{
		for (i = 0; i < size; i++)
		{
			if (FtsIsSegmentDown(newGangDefinition->db_descriptors[i]->segment_database_info))
			{
				resetSessionForPrimaryGangLoss();
				elog(ERROR, "gang was lost due to cluster reconfiguration");
			}
		}
	}
	totalSegs = getgpsegmentCount();
	Assert(totalSegs > 0);

create_gang_retry:
	Assert(newGangDefinition != NULL);
	Assert(newGangDefinition->size == size);
	successful_connections = 0;
	in_recovery_mode_count = 0;
	retry = false;

	pollingStatus = palloc(sizeof(PostgresPollingStatusType) * size);
	connStatusDone = palloc(sizeof(bool) * size);
	revents = palloc(sizeof(WaitEvent) * size);

	PG_TRY();
	{
		for (i = 0; i < size; i++)
		{
			bool		ret;
			char		gpqeid[100];
			char	   *options = NULL;
			char	   *diff_options = NULL;

			/*
			 * Create the connection requests.	If we find a segment without a
			 * valid segdb we error out.  Also, if this segdb is invalid, we
			 * must fail the connection.
			 */
			segdbDesc = newGangDefinition->db_descriptors[i];

			/* if it's a cached QE, skip */
			if (segdbDesc->conn != NULL && !cdbconn_isBadConnection(segdbDesc))
			{
				connStatusDone[i] = true;
				successful_connections++;
				continue;
			}

			/*
			 * Build the connection string.  Writer-ness needs to be processed
			 * early enough now some locks are taken before command line
			 * options are recognized.
			 */
			ret = build_gpqeid_param(gpqeid, sizeof(gpqeid),
									 segdbDesc->isWriter,
									 segdbDesc->identifier,
									 segdbDesc->segment_database_info->hostPrimaryCount,
									 totalSegs * 2);

			if (!ret)
				ereport(ERROR,
						(errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
						 errmsg("failed to construct connectionstring")));

			makeOptions(&options, &diff_options);

			/* start connection in asynchronous way */
			cdbconn_doConnectStart(segdbDesc, gpqeid, options, diff_options);

			if (cdbconn_isBadConnection(segdbDesc))
				ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
								errmsg("failed to acquire resources on one or more segments"),
								errdetail("%s (%s)", PQerrorMessage(segdbDesc->conn), segdbDesc->whoami)));

			connStatusDone[i] = false;

			/*
			 * If connection status is not CONNECTION_BAD after
			 * PQconnectStart(), we must act as if the PQconnectPoll() had
			 * returned PGRES_POLLING_WRITING
			 */
			pollingStatus[i] = PGRES_POLLING_WRITING;
		}

		/*
		 * Ok, we've now launched all the connection attempts. Start the
		 * timeout clock (= get the start timestamp), and poll until they're
		 * all completed or we reach timeout.
		 */
		gettimeofday(&startTS, NULL);

		for (;;)
		{
			poll_timeout = getPollTimeout(&startTS);
			if (poll_timeout == 0)
				ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
								errmsg("failed to acquire resources on one or more segments"),
								errdetail("timeout expired\n (%s)", segdbDesc->whoami)));

			/*
			 * GPDB_12_MERGE_FIXME: create and destroy waiteventset in each loop
			 * may impact the performance, please see:
			 * https://github.com/greenplum-db/gpdb/pull/13494#discussion_r874243725
			 * Let's verify it later.
			 */
			/*
			 * Since the set of FDs can change when we call PQconnectPoll() below,
			 * create a new wait event set to poll on for every loop iteration.
			 */
			gang_waitset = CreateWaitEventSet(CurrentMemoryContext, size);

			for (i = 0; i < size; i++)
			{
				segdbDesc = newGangDefinition->db_descriptors[i];
				int fd = PQsocket(segdbDesc->conn);

				ELOG_DISPATCHER_DEBUG("event pollingStatus, i:%d fd:%d conn-status:%d polling-status:%d",
					i, fd, connStatusDone[i], pollingStatus[i]);

				/*
				 * Skip established connections and in-recovery-mode
				 * connections
				 */
				if (connStatusDone[i])
					continue;

				switch (pollingStatus[i])
				{
					case PGRES_POLLING_OK:
						cdbconn_doConnectComplete(segdbDesc);
						if (segdbDesc->motionListener == 0)
							ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
											errmsg("failed to acquire resources on one or more segments"),
											errdetail("Internal error: No motion listener port (%s)", segdbDesc->whoami)));
						successful_connections++;
						connStatusDone[i] = true;

						continue;

					case PGRES_POLLING_READING:
						AddWaitEventToSet(gang_waitset, WL_SOCKET_READABLE, fd, NULL,
							(void *)(long)i); /* "i" as the event's userdata */

						ELOG_DISPATCHER_DEBUG("added readable event into waitset, i:%d fd:%d", i, fd);
						break;

					case PGRES_POLLING_WRITING:
						AddWaitEventToSet(gang_waitset, WL_SOCKET_WRITEABLE, fd, NULL,
							(void *)(long)i); /* "i" as the event's userdata */

						ELOG_DISPATCHER_DEBUG("added writable event into waitset, i:%d fd:%d", i, fd);
						break;

					case PGRES_POLLING_FAILED:
						if (segment_failure_due_to_recovery(PQerrorMessage(segdbDesc->conn)))
						{
							in_recovery_mode_count++;
							connStatusDone[i] = true;
							elog(LOG, "segment is in reset/recovery mode (%s)", segdbDesc->whoami);
						}
						else
						{
							if (segment_failure_due_to_missing_writer(PQerrorMessage(segdbDesc->conn)))
								markCurrentGxactWriterGangLost();
							ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
											errmsg("failed to acquire resources on one or more segments"),
											errdetail("%s (%s)", PQerrorMessage(segdbDesc->conn), segdbDesc->whoami)));
						}
						break;

					default:
						ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
										errmsg("failed to acquire resources on one or more segments"),
										errdetail("unknown pollstatus (%s)", segdbDesc->whoami)));
						break;
				}
			}

			allStatusDone = true;
			for (i = 0; i < size; i++)
				allStatusDone &= connStatusDone[i];
			if (allStatusDone)
			{
				FreeWaitEventSet(gang_waitset);
				gang_waitset = NULL;
				break;
			}

			SIMPLE_FAULT_INJECTOR("create_gang_in_progress");

			CHECK_FOR_INTERRUPTS();

			/* Wait until something happens */
			int nready = WaitEventSetWait(gang_waitset, poll_timeout, revents, size, WAIT_EVENT_GANG_ASSIGN);
			Assert(nready >= 0);
			FreeWaitEventSet(gang_waitset);
			gang_waitset = NULL;

			if (nready == 0)
			{
				ELOG_DISPATCHER_DEBUG("cdbgang_createGang_async(): WaitEventSetWait timeout after %d ms", poll_timeout);
			}
			else if (nready > 0)
			{
				for (i = 0; i < nready; i++)
				{
					/* original position in the db_descriptors */
					long pos = (long)(revents[i].user_data);
					if (connStatusDone[pos])
						continue;

					segdbDesc = newGangDefinition->db_descriptors[pos];
					int fd_desc PG_USED_FOR_ASSERTS_ONLY = PQsocket(segdbDesc->conn);
					Assert(fd_desc > 0);
					Assert(fd_desc == revents[i].fd);

					ELOG_DISPATCHER_DEBUG("ready event[%d] pos:%ld fd:%d event:%d",
						i, pos, revents[i].fd, revents[i].events);

					if (revents[i].events & WL_SOCKET_WRITEABLE ||
						revents[i].events & WL_SOCKET_READABLE)
						/*
						 * The official documentation says:
						 * Caution: do not assume that the socket remains the same across PQconnectPoll calls.
						 *
						 * So must add all sock FDs to waiteventset again in the next loop.
						 */
						pollingStatus[pos] = PQconnectPoll(segdbDesc->conn);
				}
			}
		}

		ELOG_DISPATCHER_DEBUG("createGang: %d processes requested; %d successful connections %d in recovery",
							  size, successful_connections, in_recovery_mode_count);

		/* some segments are in reset/recovery mode */
		if (successful_connections != size)
		{
			Assert(successful_connections + in_recovery_mode_count == size);

			if (gp_gang_creation_retry_count <= 0 ||
				create_gang_retry_counter++ >= gp_gang_creation_retry_count)
				ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
								errmsg("failed to acquire resources on one or more segments"),
								errdetail("Segments are in reset/recovery mode.")));

			ELOG_DISPATCHER_DEBUG("createGang: gang creation failed, but retryable.");

			retry = true;
		} /* for(;;) */
	}
	PG_CATCH();
	{
		if (gang_waitset != NULL)
			FreeWaitEventSet(gang_waitset);

		FtsNotifyProber();
		/* FTS shows some segment DBs are down */
		if (FtsTestSegmentDBIsDown(newGangDefinition->db_descriptors, size))
		{
			ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
							errmsg("failed to acquire resources on one or more segments"),
							errdetail("FTS detected one or more segments are down")));

		}

		PG_RE_THROW();
	}
	PG_END_TRY();

	pfree(pollingStatus);
	pfree(connStatusDone);
	pfree(revents);

	SIMPLE_FAULT_INJECTOR("gang_created");

	if (retry)
	{
		CHECK_FOR_INTERRUPTS();
		pg_usleep(gp_gang_creation_retry_timer * 1000);
		CHECK_FOR_INTERRUPTS();

		goto create_gang_retry;
	}

	CurrentGangCreating = NULL;

	return newGangDefinition;
}

static int
getPollTimeout(const struct timeval *startTS)
{
	struct timeval now;
	int			timeout = 0;
	int64		diff_us;

	gettimeofday(&now, NULL);

	if (gp_segment_connect_timeout > 0)
	{
		diff_us = (now.tv_sec - startTS->tv_sec) * 1000000;
		diff_us += (int) now.tv_usec - (int) startTS->tv_usec;
		if (diff_us >= (int64) gp_segment_connect_timeout * 1000000)
			timeout = 0;
		else
			timeout = gp_segment_connect_timeout * 1000 - diff_us / 1000;
	}
	else
		/* wait forever */
		timeout = -1;

	return timeout;
}

相关信息

greenplumn 源码目录

相关文章

greenplumn cdbconn 源码

greenplumn cdbdisp 源码

greenplumn cdbdisp_async 源码

greenplumn cdbdisp_dtx 源码

greenplumn cdbdisp_query 源码

greenplumn cdbdispatchresult 源码

greenplumn cdbgang 源码

greenplumn cdbpq 源码

0  赞