greenplumn cdbutil 源码

  • 2022-08-18
  • 浏览 (311)

greenplumn cdbutil 代码

文件路径:/src/backend/cdb/cdbutil.c

/*-------------------------------------------------------------------------
 *
 * cdbutil.c
 *	  Internal utility support functions for Greenplum Database/PostgreSQL.
 *
 * Portions Copyright (c) 2005-2011, Greenplum inc
 * Portions Copyright (c) 2012-Present VMware, Inc. or its affiliates.
 *
 *
 * IDENTIFICATION
 *	    src/backend/cdb/cdbutil.c
 *
 * NOTES
 *
 *	- According to src/backend/executor/execHeapScan.c
 *		"tuples returned by heap_getnext() are pointers onto disk
 *		pages and were not created with palloc() and so should not
 *		be pfree()'d"
 *
 *-------------------------------------------------------------------------
 */

#include "postgres.h"

#ifdef HAVE_SYS_RESOURCE_H
#include <sys/time.h>
#include <sys/resource.h>
#endif

#include <sys/param.h>			/* for MAXHOSTNAMELEN */

#include "access/genam.h"
#include "catalog/gp_segment_configuration.h"
#include "common/ip.h"
#include "nodes/makefuncs.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/memutils.h"
#include "catalog/gp_id.h"
#include "catalog/indexing.h"
#include "cdb/cdbhash.h"
#include "cdb/cdbutil.h"
#include "cdb/cdbmotion.h"
#include "cdb/cdbvars.h"
#include "cdb/cdbgang.h"
#include "cdb/cdbdisp_query.h"
#include "cdb/ml_ipc.h"			/* listener_setup */
#include "cdb/cdbtm.h"
#include "libpq-fe.h"
#include "libpq-int.h"
#include "cdb/cdbconn.h"
#include "cdb/cdbfts.h"
#include "storage/ipc.h"
#include "storage/proc.h"
#include "postmaster/fts.h"
#include "postmaster/postmaster.h"
#include "catalog/namespace.h"
#include "utils/gpexpand.h"
#include "access/xact.h"

#define MAX_CACHED_1_GANGS 1

#define INCR_COUNT(cdbinfo, arg) \
	(cdbinfo)->arg++; \
	(cdbinfo)->cdbs->arg++;

#define DECR_COUNT(cdbinfo, arg) \
	(cdbinfo)->arg--; \
	(cdbinfo)->cdbs->arg--; \
	Assert((cdbinfo)->arg >= 0); \
	Assert((cdbinfo)->cdbs->arg >= 0); \

#define GPSEGCONFIGDUMPFILE "gpsegconfig_dump"
#define GPSEGCONFIGDUMPFILETMP "gpsegconfig_dump_tmp"
#define GPSEGCONFIGNUMATTR 9 

MemoryContext CdbComponentsContext = NULL;
static CdbComponentDatabases *cdb_component_dbs = NULL;

/*
 * Helper Functions
 */
static CdbComponentDatabases *getCdbComponentInfo(void);
static void cleanupComponentIdleQEs(CdbComponentDatabaseInfo *cdi, bool includeWriter);

static int	CdbComponentDatabaseInfoCompare(const void *p1, const void *p2);

static GpSegConfigEntry * readGpSegConfigFromCatalog(int *total_dbs);
static GpSegConfigEntry * readGpSegConfigFromFTSFiles(int *total_dbs);

static void getAddressesForDBid(GpSegConfigEntry *c, int elevel);
static HTAB *hostPrimaryCountHashTableInit(void);

static int nextQEIdentifer(CdbComponentDatabases *cdbs);

static HTAB *segment_ip_cache_htab = NULL;

int numsegmentsFromQD = -1;

typedef struct SegIpEntry
{
	char		key[NAMEDATALEN];
	char		hostinfo[NI_MAXHOST];
} SegIpEntry;

typedef struct HostPrimaryCountEntry
{
	char		hostname[MAXHOSTNAMELEN];
	int			segmentCount;
} HostPrimaryCountEntry;

/*
 * Helper functions for fetching latest gp_segment_configuration outside of
 * the transaction.
 *
 * In phase 2 of 2PC, current xact has been marked to TRANS_COMMIT/ABORT, 
 * COMMIT_PREPARED or ABORT_PREPARED DTM are performed, if they failed,
 * dispather disconnect and destroy all gangs and fetch the latest segment
 * configurations to do RETRY_COMMIT_PREPARED or RETRY_ABORT_PREPARED,
 * however, postgres disallow catalog lookups outside of xacts.
 *
 * readGpSegConfigFromFTSFiles() notify FTS to dump the configs from catalog
 * to a flat file and then read configurations from that file.
 */
static GpSegConfigEntry *
readGpSegConfigFromFTSFiles(int *total_dbs)
{
	FILE	*fd;
	int		idx = 0;
	int		array_size = 500;
	GpSegConfigEntry *configs = NULL;
	GpSegConfigEntry *config = NULL;

	char	hostname[MAXHOSTNAMELEN];
	char	address[MAXHOSTNAMELEN];
	char	buf[MAXHOSTNAMELEN * 2 + 32];

	Assert(!IsTransactionState());

	/* notify and wait FTS to finish a probe and update the dump file */
	FtsNotifyProber();	

	fd = AllocateFile(GPSEGCONFIGDUMPFILE, "r");

	if (!fd)
		elog(ERROR, "could not open gp_segment_configutation dump file:%s:%m", GPSEGCONFIGDUMPFILE);

	configs = palloc0(sizeof (GpSegConfigEntry) * array_size); 

	while (fgets(buf, sizeof(buf), fd))
	{ 
		config = &configs[idx];

		if (sscanf(buf, "%d %d %c %c %c %c %d %s %s", (int *)&config->dbid, (int *)&config->segindex,
				   &config->role, &config->preferred_role, &config->mode, &config->status,
				   &config->port, hostname, address) != GPSEGCONFIGNUMATTR)
		{
			FreeFile(fd);
			elog(ERROR, "invalid data in gp_segment_configuration dump file: %s:%m", GPSEGCONFIGDUMPFILE);
		}

		config->hostname = pstrdup(hostname);
		config->address = pstrdup(address);

		idx++;
		/*
		 * Expand CdbComponentDatabaseInfo array if we've used up
		 * currently allocated space
		 */
		if (idx >= array_size)
		{
			array_size = array_size * 2;
			configs = (GpSegConfigEntry *)
				repalloc(configs, sizeof(GpSegConfigEntry) * array_size);
		}
	}

	FreeFile(fd);

	*total_dbs = idx;
	return configs;
}

/*
 * writeGpSegConfigToFTSFiles() dump gp_segment_configuration to the file
 * GPSEGCONFIGDUMPFILE, in $PGDATA, only FTS process can use this function.
 *
 * write contents to GPSEGCONFIGDUMPFILETMP first, then rename it to
 * GPSEGCONFIGDUMPFILE, it makes lockless read and write concurrently.
 */
void
writeGpSegConfigToFTSFiles(void)
{
	FILE	*fd;
	int		idx = 0;
	int		total_dbs = 0;
	GpSegConfigEntry *configs = NULL;
	GpSegConfigEntry *config = NULL;

	Assert(IsTransactionState());
	Assert(am_ftsprobe);

	fd = AllocateFile(GPSEGCONFIGDUMPFILETMP, "w+");

	if (!fd)
		elog(ERROR, "could not create tmp file: %s: %m", GPSEGCONFIGDUMPFILETMP);

	configs = readGpSegConfigFromCatalog(&total_dbs); 

	for (idx = 0; idx < total_dbs; idx++)
	{
		config = &configs[idx];

		if (fprintf(fd, "%d %d %c %c %c %c %d %s %s\n", config->dbid, config->segindex,
					config->role, config->preferred_role, config->mode, config->status,
					config->port, config->hostname, config->address) < 0)
		{
			FreeFile(fd);
			elog(ERROR, "could not dump gp_segment_configuration to file: %s: %m", GPSEGCONFIGDUMPFILE);
		}
	}

	FreeFile(fd);

	/* rename tmp file to permanent file */
	if (rename(GPSEGCONFIGDUMPFILETMP, GPSEGCONFIGDUMPFILE) != 0)
		elog(ERROR, "could not rename file %s to file %s: %m",
			 GPSEGCONFIGDUMPFILETMP, GPSEGCONFIGDUMPFILE);
}

static GpSegConfigEntry *
readGpSegConfigFromCatalog(int *total_dbs)
{
	int					idx = 0;
	int					array_size;
	bool				isNull;
	Datum				attr;
	Relation			gp_seg_config_rel;
	HeapTuple			gp_seg_config_tuple = NULL;
	SysScanDesc			gp_seg_config_scan;
	GpSegConfigEntry	*configs;
	GpSegConfigEntry	*config;

	array_size = 500;
	configs = palloc0(sizeof(GpSegConfigEntry) * array_size);

	gp_seg_config_rel = table_open(GpSegmentConfigRelationId, AccessShareLock);
	gp_seg_config_scan = systable_beginscan(gp_seg_config_rel, InvalidOid, false, NULL,
											0, NULL);

	while (HeapTupleIsValid(gp_seg_config_tuple = systable_getnext(gp_seg_config_scan)))
	{
		config = &configs[idx];

		/* dbid */
		attr = heap_getattr(gp_seg_config_tuple, Anum_gp_segment_configuration_dbid, RelationGetDescr(gp_seg_config_rel), &isNull);
		Assert(!isNull);
		config->dbid = DatumGetInt16(attr);

		/* content */
		attr = heap_getattr(gp_seg_config_tuple, Anum_gp_segment_configuration_content, RelationGetDescr(gp_seg_config_rel), &isNull);
		Assert(!isNull);
		config->segindex= DatumGetInt16(attr);

		/* role */
		attr = heap_getattr(gp_seg_config_tuple, Anum_gp_segment_configuration_role, RelationGetDescr(gp_seg_config_rel), &isNull);
		Assert(!isNull);
		config->role = DatumGetChar(attr);

		/* preferred-role */
		attr = heap_getattr(gp_seg_config_tuple, Anum_gp_segment_configuration_preferred_role, RelationGetDescr(gp_seg_config_rel), &isNull);
		Assert(!isNull);
		config->preferred_role = DatumGetChar(attr);

		/* mode */
		attr = heap_getattr(gp_seg_config_tuple, Anum_gp_segment_configuration_mode, RelationGetDescr(gp_seg_config_rel), &isNull);
		Assert(!isNull);
		config->mode = DatumGetChar(attr);

		/* status */
		attr = heap_getattr(gp_seg_config_tuple, Anum_gp_segment_configuration_status, RelationGetDescr(gp_seg_config_rel), &isNull);
		Assert(!isNull);
		config->status = DatumGetChar(attr);

		/* hostname */
		attr = heap_getattr(gp_seg_config_tuple, Anum_gp_segment_configuration_hostname, RelationGetDescr(gp_seg_config_rel), &isNull);
		Assert(!isNull);
		config->hostname = TextDatumGetCString(attr);

		/* address */
		attr = heap_getattr(gp_seg_config_tuple, Anum_gp_segment_configuration_address, RelationGetDescr(gp_seg_config_rel), &isNull);
		Assert(!isNull);
		config->address = TextDatumGetCString(attr);

		/* port */
		attr = heap_getattr(gp_seg_config_tuple, Anum_gp_segment_configuration_port, RelationGetDescr(gp_seg_config_rel), &isNull);
		Assert(!isNull);
		config->port = DatumGetInt32(attr);

		/* datadir is not dumped*/

		idx++;

		/*
		 * Expand CdbComponentDatabaseInfo array if we've used up
		 * currently allocated space
		 */
		if (idx >= array_size)
		{
			array_size = array_size * 2;
			configs = (GpSegConfigEntry *)
				repalloc(configs, sizeof(GpSegConfigEntry) * array_size);
		}
	}

	/*
	 * We're done with the catalog config, clean them up, closing all the
	 * relations we opened.
	 */
	systable_endscan(gp_seg_config_scan);
	table_close(gp_seg_config_rel, AccessShareLock);

	*total_dbs = idx;
	return configs;
}

/*
 *  Internal function to initialize each component info
 */
static CdbComponentDatabases *
getCdbComponentInfo(void)
{
	MemoryContext oldContext;
	CdbComponentDatabaseInfo *cdbInfo;
	CdbComponentDatabases *component_databases = NULL;
	GpSegConfigEntry *configs;

	int			i;
	int			x = 0;
	int			total_dbs = 0;

	bool		found;
	HostPrimaryCountEntry *hsEntry;

	if (!CdbComponentsContext)
		CdbComponentsContext = AllocSetContextCreate(TopMemoryContext, "cdb components Context",
								ALLOCSET_DEFAULT_MINSIZE,
								ALLOCSET_DEFAULT_INITSIZE,
								ALLOCSET_DEFAULT_MAXSIZE);

	oldContext = MemoryContextSwitchTo(CdbComponentsContext);

	HTAB	   *hostPrimaryCountHash = hostPrimaryCountHashTableInit();

	if (IsTransactionState())
		configs = readGpSegConfigFromCatalog(&total_dbs);
	else
		configs = readGpSegConfigFromFTSFiles(&total_dbs);

	component_databases = palloc0(sizeof(CdbComponentDatabases));

	component_databases->numActiveQEs = 0;
	component_databases->numIdleQEs = 0;
	component_databases->qeCounter = 0;
	component_databases->freeCounterList = NIL;

	component_databases->segment_db_info =
		(CdbComponentDatabaseInfo *) palloc0(sizeof(CdbComponentDatabaseInfo) * total_dbs);

	component_databases->entry_db_info =
		(CdbComponentDatabaseInfo *) palloc0(sizeof(CdbComponentDatabaseInfo) * 2);

	for (i = 0; i < total_dbs; i++)
	{
		CdbComponentDatabaseInfo	*pRow;
		GpSegConfigEntry	*config = &configs[i];

		if (config->hostname == NULL || strlen(config->hostname) > MAXHOSTNAMELEN)
		{
			/*
			 * We should never reach here, but add sanity check
			 * The reason we check length is we find MAXHOSTNAMELEN might be
			 * smaller than the ones defined in /etc/hosts. Those are rare cases.
			 */
			elog(ERROR,
				 "Invalid length (%d) of hostname (%s)",
				 config->hostname == NULL ? 0 : (int) strlen(config->hostname),
				 config->hostname == NULL ? "" : config->hostname);
		}

		/* lookup hostip/hostaddrs cache */
		config->hostip= NULL;
		getAddressesForDBid(config, !am_ftsprobe? ERROR : LOG);

		/*
		 * We make sure we get a valid hostip for primary here,
		 * if hostip for mirrors can not be get, ignore the error.
		 */
		if (config->hostaddrs[0] == NULL &&
			config->role == GP_SEGMENT_CONFIGURATION_ROLE_PRIMARY)
			ereport(!am_ftsprobe ? ERROR : LOG,
					(errcode(ERRCODE_CONNECTION_FAILURE),
					errmsg("cannot resolve network address for dbid=%d", config->dbid)));

		if (config->hostaddrs[0] != NULL)
			config->hostip = pstrdup(config->hostaddrs[0]);
		AssertImply(config->hostip, strlen(config->hostip) <= INET6_ADDRSTRLEN);

		/*
		 * Determine which array to place this rows data in: entry or segment,
		 * based on the content field.
		 */
		if (config->segindex >= 0)
		{
			pRow = &component_databases->segment_db_info[component_databases->total_segment_dbs];
			component_databases->total_segment_dbs++;
		}
		else
		{
			pRow = &component_databases->entry_db_info[component_databases->total_entry_dbs];
			component_databases->total_entry_dbs++;
		}

		pRow->cdbs = component_databases;
		pRow->config = config;
		pRow->freelist = NIL;
		pRow->activelist = NIL;
		pRow->numIdleQEs = 0;
		pRow->numActiveQEs = 0;

		if (config->role != GP_SEGMENT_CONFIGURATION_ROLE_PRIMARY)
			continue;

		hsEntry = (HostPrimaryCountEntry *) hash_search(hostPrimaryCountHash, config->hostname, HASH_ENTER, &found);
		if (found)
			hsEntry->segmentCount++;
		else
			hsEntry->segmentCount = 1;
	}

	/*
	 * Validate that there exists at least one entry and one segment database
	 * in the configuration
	 */
	if (component_databases->total_segment_dbs == 0)
	{
		ereport(ERROR,
				(errcode(ERRCODE_CARDINALITY_VIOLATION),
				 errmsg("number of segment databases cannot be 0")));
	}
	if (component_databases->total_entry_dbs == 0)
	{
		ereport(ERROR,
				(errcode(ERRCODE_CARDINALITY_VIOLATION),
				 errmsg("number of entry databases cannot be 0")));
	}

	/*
	 * Now sort the data by segindex, isprimary desc
	 */
	qsort(component_databases->segment_db_info,
		  component_databases->total_segment_dbs, sizeof(CdbComponentDatabaseInfo),
		  CdbComponentDatabaseInfoCompare);

	qsort(component_databases->entry_db_info,
		  component_databases->total_entry_dbs, sizeof(CdbComponentDatabaseInfo),
		  CdbComponentDatabaseInfoCompare);

	/*
	 * Now count the number of distinct segindexes. Since it's sorted, this is
	 * easy.
	 */
	for (i = 0; i < component_databases->total_segment_dbs; i++)
	{
		if (i == 0 ||
			(component_databases->segment_db_info[i].config->segindex != component_databases->segment_db_info[i - 1].config->segindex))
		{
			component_databases->total_segments++;
		}
	}

	/*
	 * Now validate that our identity is present in the entry databases
	 */
	for (i = 0; i < component_databases->total_entry_dbs; i++)
	{
		cdbInfo = &component_databases->entry_db_info[i];

		if (cdbInfo->config->dbid == GpIdentity.dbid && cdbInfo->config->segindex == GpIdentity.segindex)
		{
			break;
		}
	}
	if (i == component_databases->total_entry_dbs)
	{
		ereport(ERROR,
				(errcode(ERRCODE_DATA_EXCEPTION),
				 errmsg("cannot locate entry database"),
				 errdetail("Entry database represented by this db in gp_segment_configuration: dbid %d content %d",
						   GpIdentity.dbid, GpIdentity.segindex)));
	}

	/*
	 * Now validate that the segindexes for the segment databases are between
	 * 0 and (numsegments - 1) inclusive, and that we hit them all.
	 * Since it's sorted, this is relatively easy.
	 */
	x = 0;
	for (i = 0; i < component_databases->total_segments; i++)
	{
		int			this_segindex = -1;

		while (x < component_databases->total_segment_dbs)
		{
			this_segindex = component_databases->segment_db_info[x].config->segindex;
			if (this_segindex < i)
				x++;
			else if (this_segindex == i)
				break;
			else if (this_segindex > i)
			{
				ereport(ERROR,
						(errcode(ERRCODE_DATA_EXCEPTION),
						 errmsg("content values not valid in %s table",
								GpSegmentConfigRelationName),
						 errdetail("Content values must be in the range 0 to %d inclusive.",
								   component_databases->total_segments - 1)));
			}
		}
		if (this_segindex != i)
		{
			ereport(ERROR,
					(errcode(ERRCODE_DATA_EXCEPTION),
					 errmsg("content values not valid in %s table",
							GpSegmentConfigRelationName),
					 errdetail("Content values must be in the range 0 to %d inclusive",
							   component_databases->total_segments - 1)));
		}
	}

	for (i = 0; i < component_databases->total_segment_dbs; i++)
	{
		cdbInfo = &component_databases->segment_db_info[i];

		if (cdbInfo->config->role != GP_SEGMENT_CONFIGURATION_ROLE_PRIMARY)
			continue;

		hsEntry = (HostPrimaryCountEntry *) hash_search(hostPrimaryCountHash, cdbInfo->config->hostname, HASH_FIND, &found);
		Assert(found);
		cdbInfo->hostPrimaryCount = hsEntry->segmentCount;
	}

	for (i = 0; i < component_databases->total_entry_dbs; i++)
	{
		cdbInfo = &component_databases->entry_db_info[i];

		if (cdbInfo->config->role != GP_SEGMENT_CONFIGURATION_ROLE_PRIMARY)
			continue;

		hsEntry = (HostPrimaryCountEntry *) hash_search(hostPrimaryCountHash, cdbInfo->config->hostname, HASH_FIND, &found);
		Assert(found);
		cdbInfo->hostPrimaryCount = hsEntry->segmentCount;
	}

	hash_destroy(hostPrimaryCountHash);

	MemoryContextSwitchTo(oldContext);

	return component_databases;
}

/*
 * Helper function to clean up the idle segdbs list of
 * a segment component.
 */
static void
cleanupComponentIdleQEs(CdbComponentDatabaseInfo *cdi, bool includeWriter)
{
	SegmentDatabaseDescriptor	*segdbDesc;
	MemoryContext				oldContext;
	ListCell 					*curItem = NULL;
	ListCell					*nextItem = NULL;
	ListCell 					*prevItem = NULL;

	Assert(CdbComponentsContext);
	oldContext = MemoryContextSwitchTo(CdbComponentsContext);
	curItem = list_head(cdi->freelist);

	while (curItem != NULL)
	{
		segdbDesc = (SegmentDatabaseDescriptor *)lfirst(curItem);
		nextItem = lnext(curItem);
		Assert(segdbDesc);

		if (segdbDesc->isWriter && !includeWriter)
		{
			prevItem = curItem;
			curItem = nextItem;
			continue;
		}

		cdi->freelist = list_delete_cell(cdi->freelist, curItem, prevItem); 
		DECR_COUNT(cdi, numIdleQEs);

		cdbconn_termSegmentDescriptor(segdbDesc);

		curItem = nextItem;

	}

	MemoryContextSwitchTo(oldContext);
}

void
cdbcomponent_cleanupIdleQEs(bool includeWriter)
{
	CdbComponentDatabases	*cdbs;
	int						i;

	/* use cdb_component_dbs directly */
	cdbs = cdb_component_dbs;

	if (cdbs == NULL)		
		return;

	if (cdbs->segment_db_info != NULL)
	{
		for (i = 0; i < cdbs->total_segment_dbs; i++)
		{
			CdbComponentDatabaseInfo *cdi = &cdbs->segment_db_info[i];
			cleanupComponentIdleQEs(cdi, includeWriter);
		}
	}

	if (cdbs->entry_db_info != NULL)
	{
		for (i = 0; i < cdbs->total_entry_dbs; i++)
		{
			CdbComponentDatabaseInfo *cdi = &cdbs->entry_db_info[i];
			cleanupComponentIdleQEs(cdi, includeWriter);
		}
	}

	return;
}

/* 
 * This function is called when a transaction is started and the snapshot of
 * segments info will not changed until the end of transaction
 */
void
cdbcomponent_updateCdbComponents(void)
{
	uint8 ftsVersion= getFtsVersion();
	int expandVersion = GetGpExpandVersion();

	/*
	 * FTS takes responsibility for updating gp_segment_configuration, in each
	 * fts probe cycle, FTS firstly gets a copy of current configuration, then
	 * probe the segments based on it and finally free the copy in the end. In
	 * the probe stage, FTS might start/close transactions many times, so FTS
	 * should not update current copy of gp_segment_configuration when a new
	 * transaction is started.
	 */
	if (am_ftsprobe)
		return;

	PG_TRY();
	{
		if (cdb_component_dbs == NULL)
		{
			cdb_component_dbs = getCdbComponentInfo();
			cdb_component_dbs->fts_version = ftsVersion;
			cdb_component_dbs->expand_version = GetGpExpandVersion();
		}
		else if ((cdb_component_dbs->fts_version != ftsVersion ||
				 cdb_component_dbs->expand_version != expandVersion))
		{
			if (TempNamespaceOidIsValid())
			{
				/*
				 * Do not update here, otherwise, temp files will be lost 
				 * in segments;
				 */
			}
			else
			{
				ELOG_DISPATCHER_DEBUG("FTS rescanned, get new component databases info.");
				cdbcomponent_destroyCdbComponents();
				cdb_component_dbs = getCdbComponentInfo();
				cdb_component_dbs->fts_version = ftsVersion;
				cdb_component_dbs->expand_version = expandVersion;
			}
		}
	}
	PG_CATCH();
	{
		FtsNotifyProber();

		PG_RE_THROW();
	}
	PG_END_TRY();

	Assert(cdb_component_dbs->numActiveQEs == 0);
}

/*
 * cdbcomponent_getCdbComponents 
 *
 *
 * Storage for the SegmentInstances block and all subsidiary
 * structures are allocated from the caller's context.
 */
CdbComponentDatabases *
cdbcomponent_getCdbComponents()
{
	PG_TRY();
	{
		if (cdb_component_dbs == NULL)
		{
			cdb_component_dbs = getCdbComponentInfo();
			cdb_component_dbs->fts_version = getFtsVersion();
			cdb_component_dbs->expand_version = GetGpExpandVersion();
		}
	}
	PG_CATCH();
	{
		if (Gp_role == GP_ROLE_DISPATCH)
			FtsNotifyProber();

		PG_RE_THROW();
	}
	PG_END_TRY();

	return cdb_component_dbs;
}

/*
 * cdbcomponet_destroyCdbComponents 
 *
 * Disconnect and destroy all idle QEs, releases the memory
 * occupied by the CdbComponentDatabases
 *
 * callers must clean up QEs used by dispatcher states.
 */
void
cdbcomponent_destroyCdbComponents(void)
{
	/* caller must clean up all segdbs used by dispatcher states */
	Assert(!cdbcomponent_activeQEsExist());

	hash_destroy(segment_ip_cache_htab);
	segment_ip_cache_htab = NULL;

	/* disconnect and destroy idle QEs include writers */
	cdbcomponent_cleanupIdleQEs(true);

	/* delete the memory context */
	if (CdbComponentsContext)
		MemoryContextDelete(CdbComponentsContext);	
	CdbComponentsContext = NULL;
	cdb_component_dbs = NULL;
}

/*
 * Allocated a segdb
 *
 * If there is idle segdb in the freelist, return it, otherwise, initialize
 * a new segdb.
 *
 * idle segdbs has an established connection with segment, but new segdb is
 * not setup yet, callers need to establish the connection by themselves.
 */
SegmentDatabaseDescriptor *
cdbcomponent_allocateIdleQE(int contentId, SegmentType segmentType)
{
	SegmentDatabaseDescriptor	*segdbDesc = NULL;
	CdbComponentDatabaseInfo	*cdbinfo;
	ListCell 					*curItem = NULL;
	ListCell 					*nextItem = NULL;
	ListCell					*prevItem = NULL;
	MemoryContext 				oldContext;
	bool						isWriter;

	cdbinfo = cdbcomponent_getComponentInfo(contentId);	

	oldContext = MemoryContextSwitchTo(CdbComponentsContext);

	/*
	 * Always try to pop from the head.  Make sure to push them back to head
	 * in cdbcomponent_recycleIdleQE().
	 */
	curItem = list_head(cdbinfo->freelist);
	while (curItem != NULL)
	{
		SegmentDatabaseDescriptor *tmp =
				(SegmentDatabaseDescriptor *)lfirst(curItem);

		nextItem = lnext(curItem);
		Assert(tmp);

		if ((segmentType == SEGMENTTYPE_EXPLICT_WRITER && !tmp->isWriter) ||
			(segmentType == SEGMENTTYPE_EXPLICT_READER && tmp->isWriter))
		{
			prevItem = curItem;
			curItem = nextItem;
			continue;
		}

		cdbinfo->freelist = list_delete_cell(cdbinfo->freelist, curItem, prevItem); 
		/* update numIdleQEs */
		DECR_COUNT(cdbinfo, numIdleQEs);

		segdbDesc = tmp;
		break;
	}

	if (!segdbDesc)
	{
		/*
		 * 1. for entrydb, it's never be writer.
		 * 2. for first QE, it must be a writer.
		 */
		isWriter = contentId == -1 ? false: (cdbinfo->numIdleQEs == 0 && cdbinfo->numActiveQEs == 0);
		segdbDesc = cdbconn_createSegmentDescriptor(cdbinfo, nextQEIdentifer(cdbinfo->cdbs), isWriter);
	}

	cdbconn_setQEIdentifier(segdbDesc, -1);

	cdbinfo->activelist = lcons(segdbDesc, cdbinfo->activelist);
	INCR_COUNT(cdbinfo, numActiveQEs);

	MemoryContextSwitchTo(oldContext);

	return segdbDesc;
}

static bool
cleanupQE(SegmentDatabaseDescriptor *segdbDesc)
{
	Assert(segdbDesc != NULL);

#ifdef FAULT_INJECTOR
	if (SIMPLE_FAULT_INJECTOR("cleanup_qe") == FaultInjectorTypeSkip)
		return false;
#endif

	/*
	 * if the process is in the middle of blowing up... then we don't do
	 * anything here.  making libpq and other calls can definitely result in
	 * things getting HUNG.
	 */
	if (proc_exit_inprogress)
		return false;

	if (cdbconn_isBadConnection(segdbDesc))
		return false;

	/* if segment is down, the gang can not be reused */
	if (FtsIsSegmentDown(segdbDesc->segment_database_info))
		return false; 

	/* If a reader exceed the cached memory limitation, destroy it */
	if (!segdbDesc->isWriter &&
		(segdbDesc->conn->mop_high_watermark >> 20) > gp_vmem_protect_gang_cache_limit)
		return false;

	/* Note, we cancel all "still running" queries */
	if (!cdbconn_discardResults(segdbDesc, 20))
	{
		elog(LOG, "cleaning up seg%d while it is still busy", segdbDesc->segindex);
		return false;
	}

	/* QE is no longer associated with a slice. */
	cdbconn_setQEIdentifier(segdbDesc, /* slice index */ -1);	

	return true;
}

void
cdbcomponent_recycleIdleQE(SegmentDatabaseDescriptor *segdbDesc, bool forceDestroy)
{
	CdbComponentDatabaseInfo	*cdbinfo;
	MemoryContext				oldContext;	
	int							maxLen;
	bool						isWriter;

	Assert(cdb_component_dbs);
	Assert(CdbComponentsContext);

	cdbinfo = segdbDesc->segment_database_info;
	isWriter = segdbDesc->isWriter;

	/* update num of active QEs */
	Assert(list_member_ptr(cdbinfo->activelist, segdbDesc));
	cdbinfo->activelist = list_delete_ptr(cdbinfo->activelist, segdbDesc);
	DECR_COUNT(cdbinfo, numActiveQEs);

	oldContext = MemoryContextSwitchTo(CdbComponentsContext);

	if (forceDestroy || !cleanupQE(segdbDesc))
		goto destroy_segdb;

	/* If freelist length exceed gp_cached_gang_threshold, destroy it */
	maxLen = segdbDesc->segindex == -1 ?
					MAX_CACHED_1_GANGS : gp_cached_gang_threshold;
	if (!isWriter && list_length(cdbinfo->freelist) >= maxLen)
		goto destroy_segdb;

	/* Recycle the QE, put it to freelist */
	if (isWriter)
	{
		/* writer is always the header of freelist */
		segdbDesc->segment_database_info->freelist =
			lcons(segdbDesc, segdbDesc->segment_database_info->freelist);
	}
	else
	{
		ListCell   *lastWriter = NULL;
		ListCell   *cell;

		/*
		 * In cdbcomponent_allocateIdleQE() readers are always popped from the
		 * head, so to restore the original order we must pushed them back to
		 * the head, and keep in mind readers must be put after the writers.
		 */

		for (cell = list_head(segdbDesc->segment_database_info->freelist);
			 cell && ((SegmentDatabaseDescriptor *) lfirst(cell))->isWriter;
			 lastWriter = cell, cell = lnext(cell)) ;

		if (lastWriter)
			lappend_cell(segdbDesc->segment_database_info->freelist,
						 lastWriter, segdbDesc);
		else
			segdbDesc->segment_database_info->freelist =
				lcons(segdbDesc, segdbDesc->segment_database_info->freelist);
	}

	INCR_COUNT(cdbinfo, numIdleQEs);

	MemoryContextSwitchTo(oldContext);

	return;

destroy_segdb:

	cdbconn_termSegmentDescriptor(segdbDesc);

	if (isWriter)
	{
		markCurrentGxactWriterGangLost();
	}

	MemoryContextSwitchTo(oldContext);
}

static int
nextQEIdentifer(CdbComponentDatabases *cdbs)
{
	int result;

	if (!cdbs->freeCounterList)
		return cdbs->qeCounter++;

	result = linitial_int(cdbs->freeCounterList);
	cdbs->freeCounterList = list_delete_first(cdbs->freeCounterList);
	return result;
}

bool
cdbcomponent_qesExist(void)
{
	return !cdb_component_dbs ? false :
			(cdb_component_dbs->numIdleQEs > 0 || cdb_component_dbs->numActiveQEs > 0);
}

bool
cdbcomponent_activeQEsExist(void)
{
	return !cdb_component_dbs ? false : cdb_component_dbs->numActiveQEs > 0;
}

/*
 * Find CdbComponentDatabaseInfo in the array by segment index.
 */
CdbComponentDatabaseInfo *
cdbcomponent_getComponentInfo(int contentId)
{
	CdbComponentDatabaseInfo *cdbInfo = NULL;
	CdbComponentDatabases *cdbs;

	cdbs = cdbcomponent_getCdbComponents();

	if (contentId < -1 || contentId >= cdbs->total_segments)
		ereport(FATAL,
				(errcode(ERRCODE_DATA_EXCEPTION),
				 errmsg("unexpected content id %d, should be [-1, %d]",
						contentId, cdbs->total_segments - 1)));
	/* entry db */
	if (contentId == -1)
	{
		cdbInfo = &cdbs->entry_db_info[0];	
		return cdbInfo;
	}

	/* no mirror, segment_db_info is sorted by content id */
	if (cdbs->total_segment_dbs == cdbs->total_segments)
	{
		cdbInfo = &cdbs->segment_db_info[contentId];
		return cdbInfo;
	}

	/* with mirror, segment_db_info is sorted by content id */
	if (cdbs->total_segment_dbs != cdbs->total_segments)
	{
		Assert(cdbs->total_segment_dbs == cdbs->total_segments * 2);
		cdbInfo = &cdbs->segment_db_info[2 * contentId];

		if (!SEGMENT_IS_ACTIVE_PRIMARY(cdbInfo))
		{
			cdbInfo = &cdbs->segment_db_info[2 * contentId + 1];
		}

		return cdbInfo;
	}

	return cdbInfo;
}

static void
ensureInterconnectAddress(void)
{
	/*
	 * If the address type is wildcard, there is no need to populate an unicast
	 * address in interconnect_address.
	 */
	if (Gp_interconnect_address_type == INTERCONNECT_ADDRESS_TYPE_WILDCARD)
	{
		interconnect_address = NULL;
		return;
	}

	Assert(Gp_interconnect_address_type == INTERCONNECT_ADDRESS_TYPE_UNICAST);

	/* If the unicast address has already been assigned, exit early. */
	if (interconnect_address)
		return;

	/*
	 * Retrieve the segment's gp_segment_configuration.address value, in order
	 * to setup interconnect_address
	 */

	if (GpIdentity.segindex >= 0)
	{
		Assert(Gp_role == GP_ROLE_EXECUTE);
		Assert(MyProcPort->laddr.addr.ss_family == AF_INET
				|| MyProcPort->laddr.addr.ss_family == AF_INET6);
		/*
		 * We assume that the QD, using the address in gp_segment_configuration
		 * as its destination IP address, connects to the segment/QE.
		 * So, the local address in the PORT can be used for interconnect.
		 */
		char local_addr[NI_MAXHOST];
		getnameinfo((const struct sockaddr *)&MyProcPort->laddr.addr,
					MyProcPort->laddr.salen,
					local_addr, sizeof(local_addr),
					NULL, 0, NI_NUMERICHOST);
		interconnect_address = MemoryContextStrdup(TopMemoryContext, local_addr);
	}
	else if (Gp_role == GP_ROLE_DISPATCH)
	{
		/*
		 * Here, we can only retrieve the ADDRESS in gp_segment_configuration
		 * from `cdbcomponent*`. We couldn't get it in a way as the QEs.
		 */
		CdbComponentDatabaseInfo *qdInfo;
		qdInfo = cdbcomponent_getComponentInfo(MASTER_CONTENT_ID);
		interconnect_address = MemoryContextStrdup(TopMemoryContext, qdInfo->config->hostip);
	}
	else if (qdHostname && qdHostname[0] != '\0')
	{
		Assert(Gp_role == GP_ROLE_EXECUTE);
		/*
		 * QE on the master can't get its interconnect address like that on the primary.
		 * The QD connects to its postmaster via the unix domain socket.
		 */
		interconnect_address = qdHostname;
	}
	Assert(interconnect_address && strlen(interconnect_address) > 0);
}
/*
 * performs all necessary setup required for Greenplum Database mode.
 *
 * This includes cdblink_setup() and initializing the Motion Layer.
 */
void
cdb_setup(void)
{
	elog(DEBUG1, "Initializing Greenplum components...");

	if (Gp_role != GP_ROLE_UTILITY)
	{
		ensureInterconnectAddress();
		/* Initialize the Motion Layer IPC subsystem. */
		InitMotionLayerIPC();
	}

	/*
	 * Backend process requires consistent state, it cannot proceed until
	 * dtx recovery process finish up the recovery of distributed transactions.
	 *
	 * Ignore background worker because bgworker_should_start_mpp() already did
	 * the check.
	 */
	if (!IsBackgroundWorker &&
		Gp_role == GP_ROLE_DISPATCH &&
		!*shmDtmStarted)
	{
		ereport(FATAL,
				(errcode(ERRCODE_CANNOT_CONNECT_NOW),
				 errmsg(POSTMASTER_IN_RECOVERY_MSG),
				 errdetail("waiting for distributed transaction recovery to complete")));
	}
}

/*
 * performs all necessary cleanup required when leaving Greenplum
 * Database mode.  This is also called when the process exits.
 *
 * NOTE: the arguments to this function are here only so that we can
 *		 register it with on_proc_exit().  These parameters should not
 *		 be used since there are some callers to this that pass them
 *		 as NULL.
 *
 */
void
cdb_cleanup(int code pg_attribute_unused(), Datum arg
						pg_attribute_unused())
{
	elog(DEBUG1, "Cleaning up Greenplum components...");

	DisconnectAndDestroyAllGangs(true);

	if (Gp_role == GP_ROLE_DISPATCH)
	{
		if (cdb_total_plans > 0)
		{
			elog(DEBUG1, "session dispatched %d plans %d slices (%f), largest plan %d",
				 cdb_total_plans, cdb_total_slices,
				 ((double) cdb_total_slices / (double) cdb_total_plans),
				 cdb_max_slices);
		}
	}

	if (Gp_role != GP_ROLE_UTILITY)
	{
		/* shutdown our listener socket */
		CleanUpMotionLayerIPC();
	}
}

/*
 * CdbComponentDatabaseInfoCompare:
 * A compare function for CdbComponentDatabaseInfo structs
 * that compares based on , isprimary desc
 * for use with qsort.
 */
static int
CdbComponentDatabaseInfoCompare(const void *p1, const void *p2)
{
	const CdbComponentDatabaseInfo *obj1 = (CdbComponentDatabaseInfo *) p1;
	const CdbComponentDatabaseInfo *obj2 = (CdbComponentDatabaseInfo *) p2;

	int			cmp = obj1->config->segindex - obj2->config->segindex;

	if (cmp == 0)
	{
		int			obj2cmp = 0;
		int			obj1cmp = 0;

		if (SEGMENT_IS_ACTIVE_PRIMARY(obj2))
			obj2cmp = 1;

		if (SEGMENT_IS_ACTIVE_PRIMARY(obj1))
			obj1cmp = 1;

		cmp = obj2cmp - obj1cmp;
	}

	return cmp;
}

/*
 * Maintain a cache of names.
 *
 * The keys are all NAMEDATALEN long.
 */
static char *
getDnsCachedAddress(char *name, int port, int elevel, bool use_cache)
{
	SegIpEntry	   *e = NULL;
	char			hostinfo[NI_MAXHOST];

	if (use_cache)
	{
		if (segment_ip_cache_htab == NULL)
		{
			HASHCTL		hash_ctl;

			MemSet(&hash_ctl, 0, sizeof(hash_ctl));

			hash_ctl.keysize = NAMEDATALEN + 1;
			hash_ctl.entrysize = sizeof(SegIpEntry);

			segment_ip_cache_htab = hash_create("segment_dns_cache",
												256, &hash_ctl, HASH_ELEM);
		}
		else
		{
			e = (SegIpEntry *) hash_search(segment_ip_cache_htab,
										   name, HASH_FIND, NULL);
			if (e != NULL)
				return e->hostinfo;
		}
	}

	/*
	 * The name is either not in our cache, or we've been instructed to not
	 * use the cache. Perform the name lookup.
	 */
	if (!use_cache || (use_cache && e == NULL))
	{
		MemoryContext oldContext = NULL;
		int			ret;
		char		portNumberStr[32];
		char	   *service;
		struct addrinfo *addrs = NULL,
				   *addr;
		struct addrinfo hint;

		/* Initialize hint structure */
		MemSet(&hint, 0, sizeof(hint));
		hint.ai_socktype = SOCK_STREAM;
		hint.ai_family = AF_UNSPEC;

		snprintf(portNumberStr, sizeof(portNumberStr), "%d", port);
		service = portNumberStr;

		ret = pg_getaddrinfo_all(name, service, &hint, &addrs);
		if (ret || !addrs)
		{
			if (addrs)
				pg_freeaddrinfo_all(hint.ai_family, addrs);

			/*
			 * If a host name is unknown, whether it is an error depends on its role:
			 * - if it is a primary then it's an error;
			 * - if it is a mirror then it's just a warning;
			 * but we do not know the role information here, so always treat it as a
			 * warning, the callers should check the role and decide what to do.
			 */
			if (ret != EAI_FAIL && elevel == ERROR)
				elevel = WARNING;

			ereport(elevel,
					(errmsg("could not translate host name \"%s\", port \"%d\" to address: %s",
							name, port, gai_strerror(ret))));

			return NULL;
		}

		/* save in the cache context */
		if (use_cache)
			oldContext = MemoryContextSwitchTo(TopMemoryContext);

		hostinfo[0] = '\0';
		for (addr = addrs; addr; addr = addr->ai_next)
		{
#ifdef HAVE_UNIX_SOCKETS
			/* Ignore AF_UNIX sockets, if any are returned. */
			if (addr->ai_family == AF_UNIX)
				continue;
#endif
			if (addr->ai_family == AF_INET) /* IPv4 address */
			{
				memset(hostinfo, 0, sizeof(hostinfo));
				pg_getnameinfo_all((struct sockaddr_storage *) addr->ai_addr, addr->ai_addrlen,
								   hostinfo, sizeof(hostinfo),
								   NULL, 0,
								   NI_NUMERICHOST);

				if (use_cache)
				{
					/* Insert into our cache htab */
					e = (SegIpEntry *) hash_search(segment_ip_cache_htab,
												   name, HASH_ENTER, NULL);
					memcpy(e->hostinfo, hostinfo, sizeof(hostinfo));
				}

				break;
			}
		}

#ifdef HAVE_IPV6

		/*
		 * IPv6 probably would work fine, we'd just need to make sure all the
		 * data structures are big enough for the IPv6 address.  And on some
		 * broken systems, you can get an IPv6 address, but not be able to
		 * bind to it because IPv6 is disabled or missing in the kernel, so
		 * we'd only want to use the IPv6 address if there isn't an IPv4
		 * address.  All we really need to do is test this.
		 */
		if (((!use_cache && !hostinfo[0]) || (use_cache && e == NULL))
			&& addrs->ai_family == AF_INET6)
		{
			addr = addrs;
			/* Get a text representation of the IP address */
			pg_getnameinfo_all((struct sockaddr_storage *) addr->ai_addr, addr->ai_addrlen,
							   hostinfo, sizeof(hostinfo),
							   NULL, 0,
							   NI_NUMERICHOST);

			if (use_cache)
			{
				/* Insert into our cache htab */
				e = (SegIpEntry *) hash_search(segment_ip_cache_htab,
											   name, HASH_ENTER, NULL);
				memcpy(e->hostinfo, hostinfo, sizeof(hostinfo));
			}
		}
#endif

		if (use_cache)
			MemoryContextSwitchTo(oldContext);

		pg_freeaddrinfo_all(hint.ai_family, addrs);
	}

	/* return a pointer to our cache. */
	if (use_cache)
		return e->hostinfo;

	return pstrdup(hostinfo);
}

/*
 * getDnsAddress
 *
 * same as getDnsCachedAddress, but without using the cache. A non-cached
 * version was used inline inside of cdbgang.c, and since it is needed now
 * elsewhere, it is available externally.
 */
char *
getDnsAddress(char *hostname, int port, int elevel)
{
	return getDnsCachedAddress(hostname, port, elevel, false);
}


/*
 * Given a component-db in the system, find the addresses at which it
 * can be reached, appropriately populate the argument-structure, and
 * maintain the ip-lookup-cache.
 */
static void
getAddressesForDBid(GpSegConfigEntry *c, int elevel)
{
	char	   *name;

	Assert(c != NULL);

	/* Use hostname */
	memset(c->hostaddrs, 0, COMPONENT_DBS_MAX_ADDRS * sizeof(char *));

#ifdef FAULT_INJECTOR
	if (am_ftsprobe &&
		SIMPLE_FAULT_INJECTOR("get_dns_cached_address") == FaultInjectorTypeSkip)
	{
		/* inject a dns error for primary of segment 0 */
		if (c->segindex == 0 &&
				c->preferred_role == GP_SEGMENT_CONFIGURATION_ROLE_PRIMARY)
		{
			c->address = pstrdup("dnserrordummyaddress"); 
			c->hostname = pstrdup("dnserrordummyaddress"); 
		}
	}
#endif

	/*
	 * add an entry, using the first the "address" and then the "hostname" as
	 * fallback.
	 */
	name = getDnsCachedAddress(c->address, c->port, elevel, true);

	if (name)
	{
		c->hostaddrs[0] = pstrdup(name);
		return;
	}

	/* now the hostname. */
	name = getDnsCachedAddress(c->hostname, c->port, elevel, true);
	if (name)
	{
		c->hostaddrs[0] = pstrdup(name);
	}
	else
	{
		c->hostaddrs[0] = NULL;
	}

	return;
}

/*
 * hostPrimaryCountHashTableInit()
 *    Construct a hash table of HostPrimaryCountEntry
 */
static HTAB *
hostPrimaryCountHashTableInit(void)
{
	HASHCTL		info;

	/* Set key and entry sizes. */
	MemSet(&info, 0, sizeof(info));
	info.keysize = MAXHOSTNAMELEN;
	info.entrysize = sizeof(HostPrimaryCountEntry);

	return hash_create("HostSegs", 32, &info, HASH_ELEM);
}

/*
 * Given total number of primary segment databases and a number of
 * segments to "skip" - this routine creates a boolean map (array) the
 * size of total number of segments and randomly selects several
 * entries (total number of total_to_skip) to be marked as
 * "skipped". This is used for external tables with the 'gpfdist'
 * protocol where we want to get a number of *random* segdbs to
 * connect to a gpfdist client.
 *
 * Caller of this function should pfree skip_map when done with it.
 */
bool *
makeRandomSegMap(int total_primaries, int total_to_skip)
{
	int			randint;		/* some random int representing a seg    */
	int			skipped = 0;	/* num segs already marked to be skipped */
	bool	   *skip_map;

	skip_map = (bool *) palloc(total_primaries * sizeof(bool));
	MemSet(skip_map, false, total_primaries * sizeof(bool));

	while (total_to_skip != skipped)
	{
		/*
		 * create a random int between 0 and (total_primaries - 1).
		 */
		randint = cdbhashrandomseg(total_primaries);

		/*
		 * mark this random index 'true' in the skip map (marked to be
		 * skipped) unless it was already marked.
		 */
		if (skip_map[randint] == false)
		{
			skip_map[randint] = true;
			skipped++;
		}
	}

	return skip_map;
}

/*
 * Determine the dbid for the master standby
 */
int16
master_standby_dbid(void)
{
	int16		dbid = 0;
	HeapTuple	tup;
	Relation	rel;
	ScanKeyData scankey[2];
	SysScanDesc scan;

	/*
	 * Can only run on a master node, this restriction is due to the reliance
	 * on the gp_segment_configuration table.
	 */
	if (!IS_QUERY_DISPATCHER())
		elog(ERROR, "master_standby_dbid() executed on execution segment");

	/*
	 * SELECT * FROM gp_segment_configuration WHERE content = -1 AND role =
	 * GP_SEGMENT_CONFIGURATION_ROLE_MIRROR
	 */
	rel = table_open(GpSegmentConfigRelationId, AccessShareLock);
	ScanKeyInit(&scankey[0],
				Anum_gp_segment_configuration_content,
				BTEqualStrategyNumber, F_INT2EQ,
				Int16GetDatum(-1));
	ScanKeyInit(&scankey[1],
				Anum_gp_segment_configuration_role,
				BTEqualStrategyNumber, F_CHAREQ,
				CharGetDatum(GP_SEGMENT_CONFIGURATION_ROLE_MIRROR));
	/* no index */
	scan = systable_beginscan(rel, InvalidOid, false,
							  NULL, 2, scankey);

	tup = systable_getnext(scan);

	if (HeapTupleIsValid(tup))
	{
		dbid = ((Form_gp_segment_configuration) GETSTRUCT(tup))->dbid;
		/* We expect a single result, assert this */
		Assert(systable_getnext(scan) == NULL);
	}

	systable_endscan(scan);
	/* no need to hold the lock, it's a catalog */
	table_close(rel, AccessShareLock);

	return dbid;
}

GpSegConfigEntry *
dbid_get_dbinfo(int16 dbid)
{
	HeapTuple	tuple;
	Relation	rel;
	ScanKeyData scankey;
	SysScanDesc scan;
	GpSegConfigEntry *i = NULL;

	/*
	 * Can only run on a master node, this restriction is due to the reliance
	 * on the gp_segment_configuration table.  This may be able to be relaxed
	 * by switching to a different method of checking.
	 */
	if (!IS_QUERY_DISPATCHER())
		elog(ERROR, "dbid_get_dbinfo() executed on execution segment");

	rel = heap_open(GpSegmentConfigRelationId, AccessShareLock);

	/* SELECT * FROM gp_segment_configuration WHERE dbid = :1 */
	ScanKeyInit(&scankey,
				Anum_gp_segment_configuration_dbid,
				BTEqualStrategyNumber, F_INT2EQ,
				Int16GetDatum(dbid));
	scan = systable_beginscan(rel, GpSegmentConfigDbidIndexId, true,
							  NULL, 1, &scankey);

	tuple = systable_getnext(scan);
	if (HeapTupleIsValid(tuple))
	{
		Datum		attr;
		bool		isNull;

		i = palloc(sizeof(GpSegConfigEntry));

		/*
		 * dbid
		 */
		attr = heap_getattr(tuple, Anum_gp_segment_configuration_dbid,
							RelationGetDescr(rel), &isNull);
		Assert(!isNull);
		i->dbid = DatumGetInt16(attr);

		/*
		 * content
		 */
		attr = heap_getattr(tuple, Anum_gp_segment_configuration_content,
							RelationGetDescr(rel), &isNull);
		Assert(!isNull);
		i->segindex = DatumGetInt16(attr);

		/*
		 * role
		 */
		attr = heap_getattr(tuple, Anum_gp_segment_configuration_role,
							RelationGetDescr(rel), &isNull);
		Assert(!isNull);
		i->role = DatumGetChar(attr);

		/*
		 * preferred-role
		 */
		attr = heap_getattr(tuple,
							Anum_gp_segment_configuration_preferred_role,
							RelationGetDescr(rel), &isNull);
		Assert(!isNull);
		i->preferred_role = DatumGetChar(attr);

		/*
		 * mode
		 */
		attr = heap_getattr(tuple, Anum_gp_segment_configuration_mode,
							RelationGetDescr(rel), &isNull);
		Assert(!isNull);
		i->mode = DatumGetChar(attr);

		/*
		 * status
		 */
		attr = heap_getattr(tuple, Anum_gp_segment_configuration_status,
							RelationGetDescr(rel), &isNull);
		Assert(!isNull);
		i->status = DatumGetChar(attr);

		/*
		 * hostname
		 */
		attr = heap_getattr(tuple, Anum_gp_segment_configuration_hostname,
							RelationGetDescr(rel), &isNull);
		Assert(!isNull);
		i->hostname = TextDatumGetCString(attr);

		/*
		 * address
		 */
		attr = heap_getattr(tuple, Anum_gp_segment_configuration_address,
							RelationGetDescr(rel), &isNull);
		Assert(!isNull);
		i->address = TextDatumGetCString(attr);

		/*
		 * port
		 */
		attr = heap_getattr(tuple, Anum_gp_segment_configuration_port,
							RelationGetDescr(rel), &isNull);
		Assert(!isNull);
		i->port = DatumGetInt32(attr);

		Assert(systable_getnext(scan) == NULL); /* should be only 1 */
	}
	else
	{
		elog(ERROR, "could not find configuration entry for dbid %i", dbid);
	}

	systable_endscan(scan);
	heap_close(rel, NoLock);

	return i;
}

/*
 * Obtain the dbid of a of a segment at a given segment index (i.e., content id)
 * currently fulfilling the role specified. This means that the segment is
 * really performing the role of primary or mirror, irrespective of their
 * preferred role.
 */
int16
contentid_get_dbid(int16 contentid, char role, bool getPreferredRoleNotCurrentRole)
{
	int16		dbid = 0;
	Relation	rel;
	ScanKeyData scankey[2];
	SysScanDesc scan;
	HeapTuple	tup;

	/*
	 * Can only run on a master node, this restriction is due to the reliance
	 * on the gp_segment_configuration table.  This may be able to be relaxed
	 * by switching to a different method of checking.
	 */
	if (!IS_QUERY_DISPATCHER())
		elog(ERROR, "contentid_get_dbid() executed on execution segment");

	rel = heap_open(GpSegmentConfigRelationId, AccessShareLock);

	/* XXX XXX: CHECK THIS  XXX jic 2011/12/09 */
	if (getPreferredRoleNotCurrentRole)
	{
		/*
		 * SELECT * FROM gp_segment_configuration WHERE content = :1 AND
		 * preferred_role = :2
		 */
		ScanKeyInit(&scankey[0],
					Anum_gp_segment_configuration_content,
					BTEqualStrategyNumber, F_INT2EQ,
					Int16GetDatum(contentid));
		ScanKeyInit(&scankey[1],
					Anum_gp_segment_configuration_preferred_role,
					BTEqualStrategyNumber, F_CHAREQ,
					CharGetDatum(role));
		scan = systable_beginscan(rel, GpSegmentConfigContentPreferred_roleIndexId, true,
								  NULL, 2, scankey);
	}
	else
	{
		/*
		 * SELECT * FROM gp_segment_configuration WHERE content = :1 AND role
		 * = :2
		 */
		ScanKeyInit(&scankey[0],
					Anum_gp_segment_configuration_content,
					BTEqualStrategyNumber, F_INT2EQ,
					Int16GetDatum(contentid));
		ScanKeyInit(&scankey[1],
					Anum_gp_segment_configuration_role,
					BTEqualStrategyNumber, F_CHAREQ,
					CharGetDatum(role));
		/* no index */
		scan = systable_beginscan(rel, InvalidOid, false,
								  NULL, 2, scankey);
	}

	tup = systable_getnext(scan);
	if (HeapTupleIsValid(tup))
	{
		dbid = ((Form_gp_segment_configuration) GETSTRUCT(tup))->dbid;
		/* We expect a single result, assert this */
		Assert(systable_getnext(scan) == NULL); /* should be only 1 */
	}

	/* no need to hold the lock, it's a catalog */
	systable_endscan(scan);
	heap_close(rel, AccessShareLock);

	return dbid;
}

List *
cdbcomponent_getCdbComponentsList(void)
{
	CdbComponentDatabases *cdbs;
	List *segments = NIL;
	int i;

	cdbs = cdbcomponent_getCdbComponents();

	for (i = 0; i < cdbs->total_segments; i++)
	{
		segments = lappend_int(segments, i);
	}

	return segments;
}

/*
 * return the number of total segments for current snapshot of
 * segments info
 */
int
getgpsegmentCount(void)
{
	/* 1 represents a singleton postgresql in utility mode */
	int32 numsegments = 1;

	if (Gp_role == GP_ROLE_DISPATCH)
		numsegments = cdbcomponent_getCdbComponents()->total_segments;
	else if (Gp_role == GP_ROLE_EXECUTE)
		numsegments = numsegmentsFromQD;
	/*
	 * If we are in 'Utility & Binary Upgrade' mode, it must be launched
	 * by the pg_upgrade, so we give it an correct numsegments to make
	 * sure the pg_upgrade can run normally.
	 * Only Utility QD process have the entire information in the
	 * gp_segment_configuration, so we count the segments count in this
	 * process.
	 */
	else if (Gp_role == GP_ROLE_UTILITY &&
			 IsBinaryUpgrade &&
			 IS_QUERY_DISPATCHER())
	{
		numsegments = cdbcomponent_getCdbComponents()->total_segments;
	}

	return numsegments;
}

/*
 * IsOnConflictUpdate
 * Return true if a plannedstmt is an upsert: insert ... on conflict do update
 */
bool
IsOnConflictUpdate(PlannedStmt *ps)
{
	Plan      *plan;

	if (ps == NULL || ps->commandType != CMD_INSERT)
		return false;

	plan = ps->planTree;

	if (plan && IsA(plan, Motion))
		plan = outerPlan(plan);

	if (plan == NULL || !IsA(plan, ModifyTable))
		return false;

	return ((ModifyTable *)plan)->onConflictAction == ONCONFLICT_UPDATE;
}

/*
 * Avoid core file generation for this PANIC. It helps to avoid
 * filling up disks during tests and also saves time.
 */
void
AvoidCorefileGeneration()
{
#if defined(HAVE_GETRLIMIT) && defined(RLIMIT_CORE)
	struct rlimit lim;
	getrlimit(RLIMIT_CORE, &lim);
	lim.rlim_cur = 0;
	if (setrlimit(RLIMIT_CORE, &lim) != 0)
	{
		int			save_errno = errno;

		elog(NOTICE,
			 "setrlimit failed for RLIMIT_CORE soft limit to zero. errno: %d (%m).",
			 save_errno);
	}
#endif
}

相关信息

greenplumn 源码目录

相关文章

greenplumn cdbappendonlystorageformat 源码

greenplumn cdbappendonlystorageread 源码

greenplumn cdbappendonlystoragewrite 源码

greenplumn cdbappendonlyxlog 源码

greenplumn cdbbufferedappend 源码

greenplumn cdbbufferedread 源码

greenplumn cdbcat 源码

greenplumn cdbcopy 源码

greenplumn cdbdistributedsnapshot 源码

greenplumn cdbdistributedxacts 源码

0  赞