greenplumn cdbutil 源码
greenplumn cdbutil 代码
文件路径:/src/backend/cdb/cdbutil.c
/*-------------------------------------------------------------------------
*
* cdbutil.c
* Internal utility support functions for Greenplum Database/PostgreSQL.
*
* Portions Copyright (c) 2005-2011, Greenplum inc
* Portions Copyright (c) 2012-Present VMware, Inc. or its affiliates.
*
*
* IDENTIFICATION
* src/backend/cdb/cdbutil.c
*
* NOTES
*
* - According to src/backend/executor/execHeapScan.c
* "tuples returned by heap_getnext() are pointers onto disk
* pages and were not created with palloc() and so should not
* be pfree()'d"
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#ifdef HAVE_SYS_RESOURCE_H
#include <sys/time.h>
#include <sys/resource.h>
#endif
#include <sys/param.h> /* for MAXHOSTNAMELEN */
#include "access/genam.h"
#include "catalog/gp_segment_configuration.h"
#include "common/ip.h"
#include "nodes/makefuncs.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/memutils.h"
#include "catalog/gp_id.h"
#include "catalog/indexing.h"
#include "cdb/cdbhash.h"
#include "cdb/cdbutil.h"
#include "cdb/cdbmotion.h"
#include "cdb/cdbvars.h"
#include "cdb/cdbgang.h"
#include "cdb/cdbdisp_query.h"
#include "cdb/ml_ipc.h" /* listener_setup */
#include "cdb/cdbtm.h"
#include "libpq-fe.h"
#include "libpq-int.h"
#include "cdb/cdbconn.h"
#include "cdb/cdbfts.h"
#include "storage/ipc.h"
#include "storage/proc.h"
#include "postmaster/fts.h"
#include "postmaster/postmaster.h"
#include "catalog/namespace.h"
#include "utils/gpexpand.h"
#include "access/xact.h"
#define MAX_CACHED_1_GANGS 1
#define INCR_COUNT(cdbinfo, arg) \
(cdbinfo)->arg++; \
(cdbinfo)->cdbs->arg++;
#define DECR_COUNT(cdbinfo, arg) \
(cdbinfo)->arg--; \
(cdbinfo)->cdbs->arg--; \
Assert((cdbinfo)->arg >= 0); \
Assert((cdbinfo)->cdbs->arg >= 0); \
#define GPSEGCONFIGDUMPFILE "gpsegconfig_dump"
#define GPSEGCONFIGDUMPFILETMP "gpsegconfig_dump_tmp"
#define GPSEGCONFIGNUMATTR 9
MemoryContext CdbComponentsContext = NULL;
static CdbComponentDatabases *cdb_component_dbs = NULL;
/*
* Helper Functions
*/
static CdbComponentDatabases *getCdbComponentInfo(void);
static void cleanupComponentIdleQEs(CdbComponentDatabaseInfo *cdi, bool includeWriter);
static int CdbComponentDatabaseInfoCompare(const void *p1, const void *p2);
static GpSegConfigEntry * readGpSegConfigFromCatalog(int *total_dbs);
static GpSegConfigEntry * readGpSegConfigFromFTSFiles(int *total_dbs);
static void getAddressesForDBid(GpSegConfigEntry *c, int elevel);
static HTAB *hostPrimaryCountHashTableInit(void);
static int nextQEIdentifer(CdbComponentDatabases *cdbs);
static HTAB *segment_ip_cache_htab = NULL;
int numsegmentsFromQD = -1;
typedef struct SegIpEntry
{
char key[NAMEDATALEN];
char hostinfo[NI_MAXHOST];
} SegIpEntry;
typedef struct HostPrimaryCountEntry
{
char hostname[MAXHOSTNAMELEN];
int segmentCount;
} HostPrimaryCountEntry;
/*
* Helper functions for fetching latest gp_segment_configuration outside of
* the transaction.
*
* In phase 2 of 2PC, current xact has been marked to TRANS_COMMIT/ABORT,
* COMMIT_PREPARED or ABORT_PREPARED DTM are performed, if they failed,
* dispather disconnect and destroy all gangs and fetch the latest segment
* configurations to do RETRY_COMMIT_PREPARED or RETRY_ABORT_PREPARED,
* however, postgres disallow catalog lookups outside of xacts.
*
* readGpSegConfigFromFTSFiles() notify FTS to dump the configs from catalog
* to a flat file and then read configurations from that file.
*/
static GpSegConfigEntry *
readGpSegConfigFromFTSFiles(int *total_dbs)
{
FILE *fd;
int idx = 0;
int array_size = 500;
GpSegConfigEntry *configs = NULL;
GpSegConfigEntry *config = NULL;
char hostname[MAXHOSTNAMELEN];
char address[MAXHOSTNAMELEN];
char buf[MAXHOSTNAMELEN * 2 + 32];
Assert(!IsTransactionState());
/* notify and wait FTS to finish a probe and update the dump file */
FtsNotifyProber();
fd = AllocateFile(GPSEGCONFIGDUMPFILE, "r");
if (!fd)
elog(ERROR, "could not open gp_segment_configutation dump file:%s:%m", GPSEGCONFIGDUMPFILE);
configs = palloc0(sizeof (GpSegConfigEntry) * array_size);
while (fgets(buf, sizeof(buf), fd))
{
config = &configs[idx];
if (sscanf(buf, "%d %d %c %c %c %c %d %s %s", (int *)&config->dbid, (int *)&config->segindex,
&config->role, &config->preferred_role, &config->mode, &config->status,
&config->port, hostname, address) != GPSEGCONFIGNUMATTR)
{
FreeFile(fd);
elog(ERROR, "invalid data in gp_segment_configuration dump file: %s:%m", GPSEGCONFIGDUMPFILE);
}
config->hostname = pstrdup(hostname);
config->address = pstrdup(address);
idx++;
/*
* Expand CdbComponentDatabaseInfo array if we've used up
* currently allocated space
*/
if (idx >= array_size)
{
array_size = array_size * 2;
configs = (GpSegConfigEntry *)
repalloc(configs, sizeof(GpSegConfigEntry) * array_size);
}
}
FreeFile(fd);
*total_dbs = idx;
return configs;
}
/*
* writeGpSegConfigToFTSFiles() dump gp_segment_configuration to the file
* GPSEGCONFIGDUMPFILE, in $PGDATA, only FTS process can use this function.
*
* write contents to GPSEGCONFIGDUMPFILETMP first, then rename it to
* GPSEGCONFIGDUMPFILE, it makes lockless read and write concurrently.
*/
void
writeGpSegConfigToFTSFiles(void)
{
FILE *fd;
int idx = 0;
int total_dbs = 0;
GpSegConfigEntry *configs = NULL;
GpSegConfigEntry *config = NULL;
Assert(IsTransactionState());
Assert(am_ftsprobe);
fd = AllocateFile(GPSEGCONFIGDUMPFILETMP, "w+");
if (!fd)
elog(ERROR, "could not create tmp file: %s: %m", GPSEGCONFIGDUMPFILETMP);
configs = readGpSegConfigFromCatalog(&total_dbs);
for (idx = 0; idx < total_dbs; idx++)
{
config = &configs[idx];
if (fprintf(fd, "%d %d %c %c %c %c %d %s %s\n", config->dbid, config->segindex,
config->role, config->preferred_role, config->mode, config->status,
config->port, config->hostname, config->address) < 0)
{
FreeFile(fd);
elog(ERROR, "could not dump gp_segment_configuration to file: %s: %m", GPSEGCONFIGDUMPFILE);
}
}
FreeFile(fd);
/* rename tmp file to permanent file */
if (rename(GPSEGCONFIGDUMPFILETMP, GPSEGCONFIGDUMPFILE) != 0)
elog(ERROR, "could not rename file %s to file %s: %m",
GPSEGCONFIGDUMPFILETMP, GPSEGCONFIGDUMPFILE);
}
static GpSegConfigEntry *
readGpSegConfigFromCatalog(int *total_dbs)
{
int idx = 0;
int array_size;
bool isNull;
Datum attr;
Relation gp_seg_config_rel;
HeapTuple gp_seg_config_tuple = NULL;
SysScanDesc gp_seg_config_scan;
GpSegConfigEntry *configs;
GpSegConfigEntry *config;
array_size = 500;
configs = palloc0(sizeof(GpSegConfigEntry) * array_size);
gp_seg_config_rel = table_open(GpSegmentConfigRelationId, AccessShareLock);
gp_seg_config_scan = systable_beginscan(gp_seg_config_rel, InvalidOid, false, NULL,
0, NULL);
while (HeapTupleIsValid(gp_seg_config_tuple = systable_getnext(gp_seg_config_scan)))
{
config = &configs[idx];
/* dbid */
attr = heap_getattr(gp_seg_config_tuple, Anum_gp_segment_configuration_dbid, RelationGetDescr(gp_seg_config_rel), &isNull);
Assert(!isNull);
config->dbid = DatumGetInt16(attr);
/* content */
attr = heap_getattr(gp_seg_config_tuple, Anum_gp_segment_configuration_content, RelationGetDescr(gp_seg_config_rel), &isNull);
Assert(!isNull);
config->segindex= DatumGetInt16(attr);
/* role */
attr = heap_getattr(gp_seg_config_tuple, Anum_gp_segment_configuration_role, RelationGetDescr(gp_seg_config_rel), &isNull);
Assert(!isNull);
config->role = DatumGetChar(attr);
/* preferred-role */
attr = heap_getattr(gp_seg_config_tuple, Anum_gp_segment_configuration_preferred_role, RelationGetDescr(gp_seg_config_rel), &isNull);
Assert(!isNull);
config->preferred_role = DatumGetChar(attr);
/* mode */
attr = heap_getattr(gp_seg_config_tuple, Anum_gp_segment_configuration_mode, RelationGetDescr(gp_seg_config_rel), &isNull);
Assert(!isNull);
config->mode = DatumGetChar(attr);
/* status */
attr = heap_getattr(gp_seg_config_tuple, Anum_gp_segment_configuration_status, RelationGetDescr(gp_seg_config_rel), &isNull);
Assert(!isNull);
config->status = DatumGetChar(attr);
/* hostname */
attr = heap_getattr(gp_seg_config_tuple, Anum_gp_segment_configuration_hostname, RelationGetDescr(gp_seg_config_rel), &isNull);
Assert(!isNull);
config->hostname = TextDatumGetCString(attr);
/* address */
attr = heap_getattr(gp_seg_config_tuple, Anum_gp_segment_configuration_address, RelationGetDescr(gp_seg_config_rel), &isNull);
Assert(!isNull);
config->address = TextDatumGetCString(attr);
/* port */
attr = heap_getattr(gp_seg_config_tuple, Anum_gp_segment_configuration_port, RelationGetDescr(gp_seg_config_rel), &isNull);
Assert(!isNull);
config->port = DatumGetInt32(attr);
/* datadir is not dumped*/
idx++;
/*
* Expand CdbComponentDatabaseInfo array if we've used up
* currently allocated space
*/
if (idx >= array_size)
{
array_size = array_size * 2;
configs = (GpSegConfigEntry *)
repalloc(configs, sizeof(GpSegConfigEntry) * array_size);
}
}
/*
* We're done with the catalog config, clean them up, closing all the
* relations we opened.
*/
systable_endscan(gp_seg_config_scan);
table_close(gp_seg_config_rel, AccessShareLock);
*total_dbs = idx;
return configs;
}
/*
* Internal function to initialize each component info
*/
static CdbComponentDatabases *
getCdbComponentInfo(void)
{
MemoryContext oldContext;
CdbComponentDatabaseInfo *cdbInfo;
CdbComponentDatabases *component_databases = NULL;
GpSegConfigEntry *configs;
int i;
int x = 0;
int total_dbs = 0;
bool found;
HostPrimaryCountEntry *hsEntry;
if (!CdbComponentsContext)
CdbComponentsContext = AllocSetContextCreate(TopMemoryContext, "cdb components Context",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
oldContext = MemoryContextSwitchTo(CdbComponentsContext);
HTAB *hostPrimaryCountHash = hostPrimaryCountHashTableInit();
if (IsTransactionState())
configs = readGpSegConfigFromCatalog(&total_dbs);
else
configs = readGpSegConfigFromFTSFiles(&total_dbs);
component_databases = palloc0(sizeof(CdbComponentDatabases));
component_databases->numActiveQEs = 0;
component_databases->numIdleQEs = 0;
component_databases->qeCounter = 0;
component_databases->freeCounterList = NIL;
component_databases->segment_db_info =
(CdbComponentDatabaseInfo *) palloc0(sizeof(CdbComponentDatabaseInfo) * total_dbs);
component_databases->entry_db_info =
(CdbComponentDatabaseInfo *) palloc0(sizeof(CdbComponentDatabaseInfo) * 2);
for (i = 0; i < total_dbs; i++)
{
CdbComponentDatabaseInfo *pRow;
GpSegConfigEntry *config = &configs[i];
if (config->hostname == NULL || strlen(config->hostname) > MAXHOSTNAMELEN)
{
/*
* We should never reach here, but add sanity check
* The reason we check length is we find MAXHOSTNAMELEN might be
* smaller than the ones defined in /etc/hosts. Those are rare cases.
*/
elog(ERROR,
"Invalid length (%d) of hostname (%s)",
config->hostname == NULL ? 0 : (int) strlen(config->hostname),
config->hostname == NULL ? "" : config->hostname);
}
/* lookup hostip/hostaddrs cache */
config->hostip= NULL;
getAddressesForDBid(config, !am_ftsprobe? ERROR : LOG);
/*
* We make sure we get a valid hostip for primary here,
* if hostip for mirrors can not be get, ignore the error.
*/
if (config->hostaddrs[0] == NULL &&
config->role == GP_SEGMENT_CONFIGURATION_ROLE_PRIMARY)
ereport(!am_ftsprobe ? ERROR : LOG,
(errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("cannot resolve network address for dbid=%d", config->dbid)));
if (config->hostaddrs[0] != NULL)
config->hostip = pstrdup(config->hostaddrs[0]);
AssertImply(config->hostip, strlen(config->hostip) <= INET6_ADDRSTRLEN);
/*
* Determine which array to place this rows data in: entry or segment,
* based on the content field.
*/
if (config->segindex >= 0)
{
pRow = &component_databases->segment_db_info[component_databases->total_segment_dbs];
component_databases->total_segment_dbs++;
}
else
{
pRow = &component_databases->entry_db_info[component_databases->total_entry_dbs];
component_databases->total_entry_dbs++;
}
pRow->cdbs = component_databases;
pRow->config = config;
pRow->freelist = NIL;
pRow->activelist = NIL;
pRow->numIdleQEs = 0;
pRow->numActiveQEs = 0;
if (config->role != GP_SEGMENT_CONFIGURATION_ROLE_PRIMARY)
continue;
hsEntry = (HostPrimaryCountEntry *) hash_search(hostPrimaryCountHash, config->hostname, HASH_ENTER, &found);
if (found)
hsEntry->segmentCount++;
else
hsEntry->segmentCount = 1;
}
/*
* Validate that there exists at least one entry and one segment database
* in the configuration
*/
if (component_databases->total_segment_dbs == 0)
{
ereport(ERROR,
(errcode(ERRCODE_CARDINALITY_VIOLATION),
errmsg("number of segment databases cannot be 0")));
}
if (component_databases->total_entry_dbs == 0)
{
ereport(ERROR,
(errcode(ERRCODE_CARDINALITY_VIOLATION),
errmsg("number of entry databases cannot be 0")));
}
/*
* Now sort the data by segindex, isprimary desc
*/
qsort(component_databases->segment_db_info,
component_databases->total_segment_dbs, sizeof(CdbComponentDatabaseInfo),
CdbComponentDatabaseInfoCompare);
qsort(component_databases->entry_db_info,
component_databases->total_entry_dbs, sizeof(CdbComponentDatabaseInfo),
CdbComponentDatabaseInfoCompare);
/*
* Now count the number of distinct segindexes. Since it's sorted, this is
* easy.
*/
for (i = 0; i < component_databases->total_segment_dbs; i++)
{
if (i == 0 ||
(component_databases->segment_db_info[i].config->segindex != component_databases->segment_db_info[i - 1].config->segindex))
{
component_databases->total_segments++;
}
}
/*
* Now validate that our identity is present in the entry databases
*/
for (i = 0; i < component_databases->total_entry_dbs; i++)
{
cdbInfo = &component_databases->entry_db_info[i];
if (cdbInfo->config->dbid == GpIdentity.dbid && cdbInfo->config->segindex == GpIdentity.segindex)
{
break;
}
}
if (i == component_databases->total_entry_dbs)
{
ereport(ERROR,
(errcode(ERRCODE_DATA_EXCEPTION),
errmsg("cannot locate entry database"),
errdetail("Entry database represented by this db in gp_segment_configuration: dbid %d content %d",
GpIdentity.dbid, GpIdentity.segindex)));
}
/*
* Now validate that the segindexes for the segment databases are between
* 0 and (numsegments - 1) inclusive, and that we hit them all.
* Since it's sorted, this is relatively easy.
*/
x = 0;
for (i = 0; i < component_databases->total_segments; i++)
{
int this_segindex = -1;
while (x < component_databases->total_segment_dbs)
{
this_segindex = component_databases->segment_db_info[x].config->segindex;
if (this_segindex < i)
x++;
else if (this_segindex == i)
break;
else if (this_segindex > i)
{
ereport(ERROR,
(errcode(ERRCODE_DATA_EXCEPTION),
errmsg("content values not valid in %s table",
GpSegmentConfigRelationName),
errdetail("Content values must be in the range 0 to %d inclusive.",
component_databases->total_segments - 1)));
}
}
if (this_segindex != i)
{
ereport(ERROR,
(errcode(ERRCODE_DATA_EXCEPTION),
errmsg("content values not valid in %s table",
GpSegmentConfigRelationName),
errdetail("Content values must be in the range 0 to %d inclusive",
component_databases->total_segments - 1)));
}
}
for (i = 0; i < component_databases->total_segment_dbs; i++)
{
cdbInfo = &component_databases->segment_db_info[i];
if (cdbInfo->config->role != GP_SEGMENT_CONFIGURATION_ROLE_PRIMARY)
continue;
hsEntry = (HostPrimaryCountEntry *) hash_search(hostPrimaryCountHash, cdbInfo->config->hostname, HASH_FIND, &found);
Assert(found);
cdbInfo->hostPrimaryCount = hsEntry->segmentCount;
}
for (i = 0; i < component_databases->total_entry_dbs; i++)
{
cdbInfo = &component_databases->entry_db_info[i];
if (cdbInfo->config->role != GP_SEGMENT_CONFIGURATION_ROLE_PRIMARY)
continue;
hsEntry = (HostPrimaryCountEntry *) hash_search(hostPrimaryCountHash, cdbInfo->config->hostname, HASH_FIND, &found);
Assert(found);
cdbInfo->hostPrimaryCount = hsEntry->segmentCount;
}
hash_destroy(hostPrimaryCountHash);
MemoryContextSwitchTo(oldContext);
return component_databases;
}
/*
* Helper function to clean up the idle segdbs list of
* a segment component.
*/
static void
cleanupComponentIdleQEs(CdbComponentDatabaseInfo *cdi, bool includeWriter)
{
SegmentDatabaseDescriptor *segdbDesc;
MemoryContext oldContext;
ListCell *curItem = NULL;
ListCell *nextItem = NULL;
ListCell *prevItem = NULL;
Assert(CdbComponentsContext);
oldContext = MemoryContextSwitchTo(CdbComponentsContext);
curItem = list_head(cdi->freelist);
while (curItem != NULL)
{
segdbDesc = (SegmentDatabaseDescriptor *)lfirst(curItem);
nextItem = lnext(curItem);
Assert(segdbDesc);
if (segdbDesc->isWriter && !includeWriter)
{
prevItem = curItem;
curItem = nextItem;
continue;
}
cdi->freelist = list_delete_cell(cdi->freelist, curItem, prevItem);
DECR_COUNT(cdi, numIdleQEs);
cdbconn_termSegmentDescriptor(segdbDesc);
curItem = nextItem;
}
MemoryContextSwitchTo(oldContext);
}
void
cdbcomponent_cleanupIdleQEs(bool includeWriter)
{
CdbComponentDatabases *cdbs;
int i;
/* use cdb_component_dbs directly */
cdbs = cdb_component_dbs;
if (cdbs == NULL)
return;
if (cdbs->segment_db_info != NULL)
{
for (i = 0; i < cdbs->total_segment_dbs; i++)
{
CdbComponentDatabaseInfo *cdi = &cdbs->segment_db_info[i];
cleanupComponentIdleQEs(cdi, includeWriter);
}
}
if (cdbs->entry_db_info != NULL)
{
for (i = 0; i < cdbs->total_entry_dbs; i++)
{
CdbComponentDatabaseInfo *cdi = &cdbs->entry_db_info[i];
cleanupComponentIdleQEs(cdi, includeWriter);
}
}
return;
}
/*
* This function is called when a transaction is started and the snapshot of
* segments info will not changed until the end of transaction
*/
void
cdbcomponent_updateCdbComponents(void)
{
uint8 ftsVersion= getFtsVersion();
int expandVersion = GetGpExpandVersion();
/*
* FTS takes responsibility for updating gp_segment_configuration, in each
* fts probe cycle, FTS firstly gets a copy of current configuration, then
* probe the segments based on it and finally free the copy in the end. In
* the probe stage, FTS might start/close transactions many times, so FTS
* should not update current copy of gp_segment_configuration when a new
* transaction is started.
*/
if (am_ftsprobe)
return;
PG_TRY();
{
if (cdb_component_dbs == NULL)
{
cdb_component_dbs = getCdbComponentInfo();
cdb_component_dbs->fts_version = ftsVersion;
cdb_component_dbs->expand_version = GetGpExpandVersion();
}
else if ((cdb_component_dbs->fts_version != ftsVersion ||
cdb_component_dbs->expand_version != expandVersion))
{
if (TempNamespaceOidIsValid())
{
/*
* Do not update here, otherwise, temp files will be lost
* in segments;
*/
}
else
{
ELOG_DISPATCHER_DEBUG("FTS rescanned, get new component databases info.");
cdbcomponent_destroyCdbComponents();
cdb_component_dbs = getCdbComponentInfo();
cdb_component_dbs->fts_version = ftsVersion;
cdb_component_dbs->expand_version = expandVersion;
}
}
}
PG_CATCH();
{
FtsNotifyProber();
PG_RE_THROW();
}
PG_END_TRY();
Assert(cdb_component_dbs->numActiveQEs == 0);
}
/*
* cdbcomponent_getCdbComponents
*
*
* Storage for the SegmentInstances block and all subsidiary
* structures are allocated from the caller's context.
*/
CdbComponentDatabases *
cdbcomponent_getCdbComponents()
{
PG_TRY();
{
if (cdb_component_dbs == NULL)
{
cdb_component_dbs = getCdbComponentInfo();
cdb_component_dbs->fts_version = getFtsVersion();
cdb_component_dbs->expand_version = GetGpExpandVersion();
}
}
PG_CATCH();
{
if (Gp_role == GP_ROLE_DISPATCH)
FtsNotifyProber();
PG_RE_THROW();
}
PG_END_TRY();
return cdb_component_dbs;
}
/*
* cdbcomponet_destroyCdbComponents
*
* Disconnect and destroy all idle QEs, releases the memory
* occupied by the CdbComponentDatabases
*
* callers must clean up QEs used by dispatcher states.
*/
void
cdbcomponent_destroyCdbComponents(void)
{
/* caller must clean up all segdbs used by dispatcher states */
Assert(!cdbcomponent_activeQEsExist());
hash_destroy(segment_ip_cache_htab);
segment_ip_cache_htab = NULL;
/* disconnect and destroy idle QEs include writers */
cdbcomponent_cleanupIdleQEs(true);
/* delete the memory context */
if (CdbComponentsContext)
MemoryContextDelete(CdbComponentsContext);
CdbComponentsContext = NULL;
cdb_component_dbs = NULL;
}
/*
* Allocated a segdb
*
* If there is idle segdb in the freelist, return it, otherwise, initialize
* a new segdb.
*
* idle segdbs has an established connection with segment, but new segdb is
* not setup yet, callers need to establish the connection by themselves.
*/
SegmentDatabaseDescriptor *
cdbcomponent_allocateIdleQE(int contentId, SegmentType segmentType)
{
SegmentDatabaseDescriptor *segdbDesc = NULL;
CdbComponentDatabaseInfo *cdbinfo;
ListCell *curItem = NULL;
ListCell *nextItem = NULL;
ListCell *prevItem = NULL;
MemoryContext oldContext;
bool isWriter;
cdbinfo = cdbcomponent_getComponentInfo(contentId);
oldContext = MemoryContextSwitchTo(CdbComponentsContext);
/*
* Always try to pop from the head. Make sure to push them back to head
* in cdbcomponent_recycleIdleQE().
*/
curItem = list_head(cdbinfo->freelist);
while (curItem != NULL)
{
SegmentDatabaseDescriptor *tmp =
(SegmentDatabaseDescriptor *)lfirst(curItem);
nextItem = lnext(curItem);
Assert(tmp);
if ((segmentType == SEGMENTTYPE_EXPLICT_WRITER && !tmp->isWriter) ||
(segmentType == SEGMENTTYPE_EXPLICT_READER && tmp->isWriter))
{
prevItem = curItem;
curItem = nextItem;
continue;
}
cdbinfo->freelist = list_delete_cell(cdbinfo->freelist, curItem, prevItem);
/* update numIdleQEs */
DECR_COUNT(cdbinfo, numIdleQEs);
segdbDesc = tmp;
break;
}
if (!segdbDesc)
{
/*
* 1. for entrydb, it's never be writer.
* 2. for first QE, it must be a writer.
*/
isWriter = contentId == -1 ? false: (cdbinfo->numIdleQEs == 0 && cdbinfo->numActiveQEs == 0);
segdbDesc = cdbconn_createSegmentDescriptor(cdbinfo, nextQEIdentifer(cdbinfo->cdbs), isWriter);
}
cdbconn_setQEIdentifier(segdbDesc, -1);
cdbinfo->activelist = lcons(segdbDesc, cdbinfo->activelist);
INCR_COUNT(cdbinfo, numActiveQEs);
MemoryContextSwitchTo(oldContext);
return segdbDesc;
}
static bool
cleanupQE(SegmentDatabaseDescriptor *segdbDesc)
{
Assert(segdbDesc != NULL);
#ifdef FAULT_INJECTOR
if (SIMPLE_FAULT_INJECTOR("cleanup_qe") == FaultInjectorTypeSkip)
return false;
#endif
/*
* if the process is in the middle of blowing up... then we don't do
* anything here. making libpq and other calls can definitely result in
* things getting HUNG.
*/
if (proc_exit_inprogress)
return false;
if (cdbconn_isBadConnection(segdbDesc))
return false;
/* if segment is down, the gang can not be reused */
if (FtsIsSegmentDown(segdbDesc->segment_database_info))
return false;
/* If a reader exceed the cached memory limitation, destroy it */
if (!segdbDesc->isWriter &&
(segdbDesc->conn->mop_high_watermark >> 20) > gp_vmem_protect_gang_cache_limit)
return false;
/* Note, we cancel all "still running" queries */
if (!cdbconn_discardResults(segdbDesc, 20))
{
elog(LOG, "cleaning up seg%d while it is still busy", segdbDesc->segindex);
return false;
}
/* QE is no longer associated with a slice. */
cdbconn_setQEIdentifier(segdbDesc, /* slice index */ -1);
return true;
}
void
cdbcomponent_recycleIdleQE(SegmentDatabaseDescriptor *segdbDesc, bool forceDestroy)
{
CdbComponentDatabaseInfo *cdbinfo;
MemoryContext oldContext;
int maxLen;
bool isWriter;
Assert(cdb_component_dbs);
Assert(CdbComponentsContext);
cdbinfo = segdbDesc->segment_database_info;
isWriter = segdbDesc->isWriter;
/* update num of active QEs */
Assert(list_member_ptr(cdbinfo->activelist, segdbDesc));
cdbinfo->activelist = list_delete_ptr(cdbinfo->activelist, segdbDesc);
DECR_COUNT(cdbinfo, numActiveQEs);
oldContext = MemoryContextSwitchTo(CdbComponentsContext);
if (forceDestroy || !cleanupQE(segdbDesc))
goto destroy_segdb;
/* If freelist length exceed gp_cached_gang_threshold, destroy it */
maxLen = segdbDesc->segindex == -1 ?
MAX_CACHED_1_GANGS : gp_cached_gang_threshold;
if (!isWriter && list_length(cdbinfo->freelist) >= maxLen)
goto destroy_segdb;
/* Recycle the QE, put it to freelist */
if (isWriter)
{
/* writer is always the header of freelist */
segdbDesc->segment_database_info->freelist =
lcons(segdbDesc, segdbDesc->segment_database_info->freelist);
}
else
{
ListCell *lastWriter = NULL;
ListCell *cell;
/*
* In cdbcomponent_allocateIdleQE() readers are always popped from the
* head, so to restore the original order we must pushed them back to
* the head, and keep in mind readers must be put after the writers.
*/
for (cell = list_head(segdbDesc->segment_database_info->freelist);
cell && ((SegmentDatabaseDescriptor *) lfirst(cell))->isWriter;
lastWriter = cell, cell = lnext(cell)) ;
if (lastWriter)
lappend_cell(segdbDesc->segment_database_info->freelist,
lastWriter, segdbDesc);
else
segdbDesc->segment_database_info->freelist =
lcons(segdbDesc, segdbDesc->segment_database_info->freelist);
}
INCR_COUNT(cdbinfo, numIdleQEs);
MemoryContextSwitchTo(oldContext);
return;
destroy_segdb:
cdbconn_termSegmentDescriptor(segdbDesc);
if (isWriter)
{
markCurrentGxactWriterGangLost();
}
MemoryContextSwitchTo(oldContext);
}
static int
nextQEIdentifer(CdbComponentDatabases *cdbs)
{
int result;
if (!cdbs->freeCounterList)
return cdbs->qeCounter++;
result = linitial_int(cdbs->freeCounterList);
cdbs->freeCounterList = list_delete_first(cdbs->freeCounterList);
return result;
}
bool
cdbcomponent_qesExist(void)
{
return !cdb_component_dbs ? false :
(cdb_component_dbs->numIdleQEs > 0 || cdb_component_dbs->numActiveQEs > 0);
}
bool
cdbcomponent_activeQEsExist(void)
{
return !cdb_component_dbs ? false : cdb_component_dbs->numActiveQEs > 0;
}
/*
* Find CdbComponentDatabaseInfo in the array by segment index.
*/
CdbComponentDatabaseInfo *
cdbcomponent_getComponentInfo(int contentId)
{
CdbComponentDatabaseInfo *cdbInfo = NULL;
CdbComponentDatabases *cdbs;
cdbs = cdbcomponent_getCdbComponents();
if (contentId < -1 || contentId >= cdbs->total_segments)
ereport(FATAL,
(errcode(ERRCODE_DATA_EXCEPTION),
errmsg("unexpected content id %d, should be [-1, %d]",
contentId, cdbs->total_segments - 1)));
/* entry db */
if (contentId == -1)
{
cdbInfo = &cdbs->entry_db_info[0];
return cdbInfo;
}
/* no mirror, segment_db_info is sorted by content id */
if (cdbs->total_segment_dbs == cdbs->total_segments)
{
cdbInfo = &cdbs->segment_db_info[contentId];
return cdbInfo;
}
/* with mirror, segment_db_info is sorted by content id */
if (cdbs->total_segment_dbs != cdbs->total_segments)
{
Assert(cdbs->total_segment_dbs == cdbs->total_segments * 2);
cdbInfo = &cdbs->segment_db_info[2 * contentId];
if (!SEGMENT_IS_ACTIVE_PRIMARY(cdbInfo))
{
cdbInfo = &cdbs->segment_db_info[2 * contentId + 1];
}
return cdbInfo;
}
return cdbInfo;
}
static void
ensureInterconnectAddress(void)
{
/*
* If the address type is wildcard, there is no need to populate an unicast
* address in interconnect_address.
*/
if (Gp_interconnect_address_type == INTERCONNECT_ADDRESS_TYPE_WILDCARD)
{
interconnect_address = NULL;
return;
}
Assert(Gp_interconnect_address_type == INTERCONNECT_ADDRESS_TYPE_UNICAST);
/* If the unicast address has already been assigned, exit early. */
if (interconnect_address)
return;
/*
* Retrieve the segment's gp_segment_configuration.address value, in order
* to setup interconnect_address
*/
if (GpIdentity.segindex >= 0)
{
Assert(Gp_role == GP_ROLE_EXECUTE);
Assert(MyProcPort->laddr.addr.ss_family == AF_INET
|| MyProcPort->laddr.addr.ss_family == AF_INET6);
/*
* We assume that the QD, using the address in gp_segment_configuration
* as its destination IP address, connects to the segment/QE.
* So, the local address in the PORT can be used for interconnect.
*/
char local_addr[NI_MAXHOST];
getnameinfo((const struct sockaddr *)&MyProcPort->laddr.addr,
MyProcPort->laddr.salen,
local_addr, sizeof(local_addr),
NULL, 0, NI_NUMERICHOST);
interconnect_address = MemoryContextStrdup(TopMemoryContext, local_addr);
}
else if (Gp_role == GP_ROLE_DISPATCH)
{
/*
* Here, we can only retrieve the ADDRESS in gp_segment_configuration
* from `cdbcomponent*`. We couldn't get it in a way as the QEs.
*/
CdbComponentDatabaseInfo *qdInfo;
qdInfo = cdbcomponent_getComponentInfo(MASTER_CONTENT_ID);
interconnect_address = MemoryContextStrdup(TopMemoryContext, qdInfo->config->hostip);
}
else if (qdHostname && qdHostname[0] != '\0')
{
Assert(Gp_role == GP_ROLE_EXECUTE);
/*
* QE on the master can't get its interconnect address like that on the primary.
* The QD connects to its postmaster via the unix domain socket.
*/
interconnect_address = qdHostname;
}
Assert(interconnect_address && strlen(interconnect_address) > 0);
}
/*
* performs all necessary setup required for Greenplum Database mode.
*
* This includes cdblink_setup() and initializing the Motion Layer.
*/
void
cdb_setup(void)
{
elog(DEBUG1, "Initializing Greenplum components...");
if (Gp_role != GP_ROLE_UTILITY)
{
ensureInterconnectAddress();
/* Initialize the Motion Layer IPC subsystem. */
InitMotionLayerIPC();
}
/*
* Backend process requires consistent state, it cannot proceed until
* dtx recovery process finish up the recovery of distributed transactions.
*
* Ignore background worker because bgworker_should_start_mpp() already did
* the check.
*/
if (!IsBackgroundWorker &&
Gp_role == GP_ROLE_DISPATCH &&
!*shmDtmStarted)
{
ereport(FATAL,
(errcode(ERRCODE_CANNOT_CONNECT_NOW),
errmsg(POSTMASTER_IN_RECOVERY_MSG),
errdetail("waiting for distributed transaction recovery to complete")));
}
}
/*
* performs all necessary cleanup required when leaving Greenplum
* Database mode. This is also called when the process exits.
*
* NOTE: the arguments to this function are here only so that we can
* register it with on_proc_exit(). These parameters should not
* be used since there are some callers to this that pass them
* as NULL.
*
*/
void
cdb_cleanup(int code pg_attribute_unused(), Datum arg
pg_attribute_unused())
{
elog(DEBUG1, "Cleaning up Greenplum components...");
DisconnectAndDestroyAllGangs(true);
if (Gp_role == GP_ROLE_DISPATCH)
{
if (cdb_total_plans > 0)
{
elog(DEBUG1, "session dispatched %d plans %d slices (%f), largest plan %d",
cdb_total_plans, cdb_total_slices,
((double) cdb_total_slices / (double) cdb_total_plans),
cdb_max_slices);
}
}
if (Gp_role != GP_ROLE_UTILITY)
{
/* shutdown our listener socket */
CleanUpMotionLayerIPC();
}
}
/*
* CdbComponentDatabaseInfoCompare:
* A compare function for CdbComponentDatabaseInfo structs
* that compares based on , isprimary desc
* for use with qsort.
*/
static int
CdbComponentDatabaseInfoCompare(const void *p1, const void *p2)
{
const CdbComponentDatabaseInfo *obj1 = (CdbComponentDatabaseInfo *) p1;
const CdbComponentDatabaseInfo *obj2 = (CdbComponentDatabaseInfo *) p2;
int cmp = obj1->config->segindex - obj2->config->segindex;
if (cmp == 0)
{
int obj2cmp = 0;
int obj1cmp = 0;
if (SEGMENT_IS_ACTIVE_PRIMARY(obj2))
obj2cmp = 1;
if (SEGMENT_IS_ACTIVE_PRIMARY(obj1))
obj1cmp = 1;
cmp = obj2cmp - obj1cmp;
}
return cmp;
}
/*
* Maintain a cache of names.
*
* The keys are all NAMEDATALEN long.
*/
static char *
getDnsCachedAddress(char *name, int port, int elevel, bool use_cache)
{
SegIpEntry *e = NULL;
char hostinfo[NI_MAXHOST];
if (use_cache)
{
if (segment_ip_cache_htab == NULL)
{
HASHCTL hash_ctl;
MemSet(&hash_ctl, 0, sizeof(hash_ctl));
hash_ctl.keysize = NAMEDATALEN + 1;
hash_ctl.entrysize = sizeof(SegIpEntry);
segment_ip_cache_htab = hash_create("segment_dns_cache",
256, &hash_ctl, HASH_ELEM);
}
else
{
e = (SegIpEntry *) hash_search(segment_ip_cache_htab,
name, HASH_FIND, NULL);
if (e != NULL)
return e->hostinfo;
}
}
/*
* The name is either not in our cache, or we've been instructed to not
* use the cache. Perform the name lookup.
*/
if (!use_cache || (use_cache && e == NULL))
{
MemoryContext oldContext = NULL;
int ret;
char portNumberStr[32];
char *service;
struct addrinfo *addrs = NULL,
*addr;
struct addrinfo hint;
/* Initialize hint structure */
MemSet(&hint, 0, sizeof(hint));
hint.ai_socktype = SOCK_STREAM;
hint.ai_family = AF_UNSPEC;
snprintf(portNumberStr, sizeof(portNumberStr), "%d", port);
service = portNumberStr;
ret = pg_getaddrinfo_all(name, service, &hint, &addrs);
if (ret || !addrs)
{
if (addrs)
pg_freeaddrinfo_all(hint.ai_family, addrs);
/*
* If a host name is unknown, whether it is an error depends on its role:
* - if it is a primary then it's an error;
* - if it is a mirror then it's just a warning;
* but we do not know the role information here, so always treat it as a
* warning, the callers should check the role and decide what to do.
*/
if (ret != EAI_FAIL && elevel == ERROR)
elevel = WARNING;
ereport(elevel,
(errmsg("could not translate host name \"%s\", port \"%d\" to address: %s",
name, port, gai_strerror(ret))));
return NULL;
}
/* save in the cache context */
if (use_cache)
oldContext = MemoryContextSwitchTo(TopMemoryContext);
hostinfo[0] = '\0';
for (addr = addrs; addr; addr = addr->ai_next)
{
#ifdef HAVE_UNIX_SOCKETS
/* Ignore AF_UNIX sockets, if any are returned. */
if (addr->ai_family == AF_UNIX)
continue;
#endif
if (addr->ai_family == AF_INET) /* IPv4 address */
{
memset(hostinfo, 0, sizeof(hostinfo));
pg_getnameinfo_all((struct sockaddr_storage *) addr->ai_addr, addr->ai_addrlen,
hostinfo, sizeof(hostinfo),
NULL, 0,
NI_NUMERICHOST);
if (use_cache)
{
/* Insert into our cache htab */
e = (SegIpEntry *) hash_search(segment_ip_cache_htab,
name, HASH_ENTER, NULL);
memcpy(e->hostinfo, hostinfo, sizeof(hostinfo));
}
break;
}
}
#ifdef HAVE_IPV6
/*
* IPv6 probably would work fine, we'd just need to make sure all the
* data structures are big enough for the IPv6 address. And on some
* broken systems, you can get an IPv6 address, but not be able to
* bind to it because IPv6 is disabled or missing in the kernel, so
* we'd only want to use the IPv6 address if there isn't an IPv4
* address. All we really need to do is test this.
*/
if (((!use_cache && !hostinfo[0]) || (use_cache && e == NULL))
&& addrs->ai_family == AF_INET6)
{
addr = addrs;
/* Get a text representation of the IP address */
pg_getnameinfo_all((struct sockaddr_storage *) addr->ai_addr, addr->ai_addrlen,
hostinfo, sizeof(hostinfo),
NULL, 0,
NI_NUMERICHOST);
if (use_cache)
{
/* Insert into our cache htab */
e = (SegIpEntry *) hash_search(segment_ip_cache_htab,
name, HASH_ENTER, NULL);
memcpy(e->hostinfo, hostinfo, sizeof(hostinfo));
}
}
#endif
if (use_cache)
MemoryContextSwitchTo(oldContext);
pg_freeaddrinfo_all(hint.ai_family, addrs);
}
/* return a pointer to our cache. */
if (use_cache)
return e->hostinfo;
return pstrdup(hostinfo);
}
/*
* getDnsAddress
*
* same as getDnsCachedAddress, but without using the cache. A non-cached
* version was used inline inside of cdbgang.c, and since it is needed now
* elsewhere, it is available externally.
*/
char *
getDnsAddress(char *hostname, int port, int elevel)
{
return getDnsCachedAddress(hostname, port, elevel, false);
}
/*
* Given a component-db in the system, find the addresses at which it
* can be reached, appropriately populate the argument-structure, and
* maintain the ip-lookup-cache.
*/
static void
getAddressesForDBid(GpSegConfigEntry *c, int elevel)
{
char *name;
Assert(c != NULL);
/* Use hostname */
memset(c->hostaddrs, 0, COMPONENT_DBS_MAX_ADDRS * sizeof(char *));
#ifdef FAULT_INJECTOR
if (am_ftsprobe &&
SIMPLE_FAULT_INJECTOR("get_dns_cached_address") == FaultInjectorTypeSkip)
{
/* inject a dns error for primary of segment 0 */
if (c->segindex == 0 &&
c->preferred_role == GP_SEGMENT_CONFIGURATION_ROLE_PRIMARY)
{
c->address = pstrdup("dnserrordummyaddress");
c->hostname = pstrdup("dnserrordummyaddress");
}
}
#endif
/*
* add an entry, using the first the "address" and then the "hostname" as
* fallback.
*/
name = getDnsCachedAddress(c->address, c->port, elevel, true);
if (name)
{
c->hostaddrs[0] = pstrdup(name);
return;
}
/* now the hostname. */
name = getDnsCachedAddress(c->hostname, c->port, elevel, true);
if (name)
{
c->hostaddrs[0] = pstrdup(name);
}
else
{
c->hostaddrs[0] = NULL;
}
return;
}
/*
* hostPrimaryCountHashTableInit()
* Construct a hash table of HostPrimaryCountEntry
*/
static HTAB *
hostPrimaryCountHashTableInit(void)
{
HASHCTL info;
/* Set key and entry sizes. */
MemSet(&info, 0, sizeof(info));
info.keysize = MAXHOSTNAMELEN;
info.entrysize = sizeof(HostPrimaryCountEntry);
return hash_create("HostSegs", 32, &info, HASH_ELEM);
}
/*
* Given total number of primary segment databases and a number of
* segments to "skip" - this routine creates a boolean map (array) the
* size of total number of segments and randomly selects several
* entries (total number of total_to_skip) to be marked as
* "skipped". This is used for external tables with the 'gpfdist'
* protocol where we want to get a number of *random* segdbs to
* connect to a gpfdist client.
*
* Caller of this function should pfree skip_map when done with it.
*/
bool *
makeRandomSegMap(int total_primaries, int total_to_skip)
{
int randint; /* some random int representing a seg */
int skipped = 0; /* num segs already marked to be skipped */
bool *skip_map;
skip_map = (bool *) palloc(total_primaries * sizeof(bool));
MemSet(skip_map, false, total_primaries * sizeof(bool));
while (total_to_skip != skipped)
{
/*
* create a random int between 0 and (total_primaries - 1).
*/
randint = cdbhashrandomseg(total_primaries);
/*
* mark this random index 'true' in the skip map (marked to be
* skipped) unless it was already marked.
*/
if (skip_map[randint] == false)
{
skip_map[randint] = true;
skipped++;
}
}
return skip_map;
}
/*
* Determine the dbid for the master standby
*/
int16
master_standby_dbid(void)
{
int16 dbid = 0;
HeapTuple tup;
Relation rel;
ScanKeyData scankey[2];
SysScanDesc scan;
/*
* Can only run on a master node, this restriction is due to the reliance
* on the gp_segment_configuration table.
*/
if (!IS_QUERY_DISPATCHER())
elog(ERROR, "master_standby_dbid() executed on execution segment");
/*
* SELECT * FROM gp_segment_configuration WHERE content = -1 AND role =
* GP_SEGMENT_CONFIGURATION_ROLE_MIRROR
*/
rel = table_open(GpSegmentConfigRelationId, AccessShareLock);
ScanKeyInit(&scankey[0],
Anum_gp_segment_configuration_content,
BTEqualStrategyNumber, F_INT2EQ,
Int16GetDatum(-1));
ScanKeyInit(&scankey[1],
Anum_gp_segment_configuration_role,
BTEqualStrategyNumber, F_CHAREQ,
CharGetDatum(GP_SEGMENT_CONFIGURATION_ROLE_MIRROR));
/* no index */
scan = systable_beginscan(rel, InvalidOid, false,
NULL, 2, scankey);
tup = systable_getnext(scan);
if (HeapTupleIsValid(tup))
{
dbid = ((Form_gp_segment_configuration) GETSTRUCT(tup))->dbid;
/* We expect a single result, assert this */
Assert(systable_getnext(scan) == NULL);
}
systable_endscan(scan);
/* no need to hold the lock, it's a catalog */
table_close(rel, AccessShareLock);
return dbid;
}
GpSegConfigEntry *
dbid_get_dbinfo(int16 dbid)
{
HeapTuple tuple;
Relation rel;
ScanKeyData scankey;
SysScanDesc scan;
GpSegConfigEntry *i = NULL;
/*
* Can only run on a master node, this restriction is due to the reliance
* on the gp_segment_configuration table. This may be able to be relaxed
* by switching to a different method of checking.
*/
if (!IS_QUERY_DISPATCHER())
elog(ERROR, "dbid_get_dbinfo() executed on execution segment");
rel = heap_open(GpSegmentConfigRelationId, AccessShareLock);
/* SELECT * FROM gp_segment_configuration WHERE dbid = :1 */
ScanKeyInit(&scankey,
Anum_gp_segment_configuration_dbid,
BTEqualStrategyNumber, F_INT2EQ,
Int16GetDatum(dbid));
scan = systable_beginscan(rel, GpSegmentConfigDbidIndexId, true,
NULL, 1, &scankey);
tuple = systable_getnext(scan);
if (HeapTupleIsValid(tuple))
{
Datum attr;
bool isNull;
i = palloc(sizeof(GpSegConfigEntry));
/*
* dbid
*/
attr = heap_getattr(tuple, Anum_gp_segment_configuration_dbid,
RelationGetDescr(rel), &isNull);
Assert(!isNull);
i->dbid = DatumGetInt16(attr);
/*
* content
*/
attr = heap_getattr(tuple, Anum_gp_segment_configuration_content,
RelationGetDescr(rel), &isNull);
Assert(!isNull);
i->segindex = DatumGetInt16(attr);
/*
* role
*/
attr = heap_getattr(tuple, Anum_gp_segment_configuration_role,
RelationGetDescr(rel), &isNull);
Assert(!isNull);
i->role = DatumGetChar(attr);
/*
* preferred-role
*/
attr = heap_getattr(tuple,
Anum_gp_segment_configuration_preferred_role,
RelationGetDescr(rel), &isNull);
Assert(!isNull);
i->preferred_role = DatumGetChar(attr);
/*
* mode
*/
attr = heap_getattr(tuple, Anum_gp_segment_configuration_mode,
RelationGetDescr(rel), &isNull);
Assert(!isNull);
i->mode = DatumGetChar(attr);
/*
* status
*/
attr = heap_getattr(tuple, Anum_gp_segment_configuration_status,
RelationGetDescr(rel), &isNull);
Assert(!isNull);
i->status = DatumGetChar(attr);
/*
* hostname
*/
attr = heap_getattr(tuple, Anum_gp_segment_configuration_hostname,
RelationGetDescr(rel), &isNull);
Assert(!isNull);
i->hostname = TextDatumGetCString(attr);
/*
* address
*/
attr = heap_getattr(tuple, Anum_gp_segment_configuration_address,
RelationGetDescr(rel), &isNull);
Assert(!isNull);
i->address = TextDatumGetCString(attr);
/*
* port
*/
attr = heap_getattr(tuple, Anum_gp_segment_configuration_port,
RelationGetDescr(rel), &isNull);
Assert(!isNull);
i->port = DatumGetInt32(attr);
Assert(systable_getnext(scan) == NULL); /* should be only 1 */
}
else
{
elog(ERROR, "could not find configuration entry for dbid %i", dbid);
}
systable_endscan(scan);
heap_close(rel, NoLock);
return i;
}
/*
* Obtain the dbid of a of a segment at a given segment index (i.e., content id)
* currently fulfilling the role specified. This means that the segment is
* really performing the role of primary or mirror, irrespective of their
* preferred role.
*/
int16
contentid_get_dbid(int16 contentid, char role, bool getPreferredRoleNotCurrentRole)
{
int16 dbid = 0;
Relation rel;
ScanKeyData scankey[2];
SysScanDesc scan;
HeapTuple tup;
/*
* Can only run on a master node, this restriction is due to the reliance
* on the gp_segment_configuration table. This may be able to be relaxed
* by switching to a different method of checking.
*/
if (!IS_QUERY_DISPATCHER())
elog(ERROR, "contentid_get_dbid() executed on execution segment");
rel = heap_open(GpSegmentConfigRelationId, AccessShareLock);
/* XXX XXX: CHECK THIS XXX jic 2011/12/09 */
if (getPreferredRoleNotCurrentRole)
{
/*
* SELECT * FROM gp_segment_configuration WHERE content = :1 AND
* preferred_role = :2
*/
ScanKeyInit(&scankey[0],
Anum_gp_segment_configuration_content,
BTEqualStrategyNumber, F_INT2EQ,
Int16GetDatum(contentid));
ScanKeyInit(&scankey[1],
Anum_gp_segment_configuration_preferred_role,
BTEqualStrategyNumber, F_CHAREQ,
CharGetDatum(role));
scan = systable_beginscan(rel, GpSegmentConfigContentPreferred_roleIndexId, true,
NULL, 2, scankey);
}
else
{
/*
* SELECT * FROM gp_segment_configuration WHERE content = :1 AND role
* = :2
*/
ScanKeyInit(&scankey[0],
Anum_gp_segment_configuration_content,
BTEqualStrategyNumber, F_INT2EQ,
Int16GetDatum(contentid));
ScanKeyInit(&scankey[1],
Anum_gp_segment_configuration_role,
BTEqualStrategyNumber, F_CHAREQ,
CharGetDatum(role));
/* no index */
scan = systable_beginscan(rel, InvalidOid, false,
NULL, 2, scankey);
}
tup = systable_getnext(scan);
if (HeapTupleIsValid(tup))
{
dbid = ((Form_gp_segment_configuration) GETSTRUCT(tup))->dbid;
/* We expect a single result, assert this */
Assert(systable_getnext(scan) == NULL); /* should be only 1 */
}
/* no need to hold the lock, it's a catalog */
systable_endscan(scan);
heap_close(rel, AccessShareLock);
return dbid;
}
List *
cdbcomponent_getCdbComponentsList(void)
{
CdbComponentDatabases *cdbs;
List *segments = NIL;
int i;
cdbs = cdbcomponent_getCdbComponents();
for (i = 0; i < cdbs->total_segments; i++)
{
segments = lappend_int(segments, i);
}
return segments;
}
/*
* return the number of total segments for current snapshot of
* segments info
*/
int
getgpsegmentCount(void)
{
/* 1 represents a singleton postgresql in utility mode */
int32 numsegments = 1;
if (Gp_role == GP_ROLE_DISPATCH)
numsegments = cdbcomponent_getCdbComponents()->total_segments;
else if (Gp_role == GP_ROLE_EXECUTE)
numsegments = numsegmentsFromQD;
/*
* If we are in 'Utility & Binary Upgrade' mode, it must be launched
* by the pg_upgrade, so we give it an correct numsegments to make
* sure the pg_upgrade can run normally.
* Only Utility QD process have the entire information in the
* gp_segment_configuration, so we count the segments count in this
* process.
*/
else if (Gp_role == GP_ROLE_UTILITY &&
IsBinaryUpgrade &&
IS_QUERY_DISPATCHER())
{
numsegments = cdbcomponent_getCdbComponents()->total_segments;
}
return numsegments;
}
/*
* IsOnConflictUpdate
* Return true if a plannedstmt is an upsert: insert ... on conflict do update
*/
bool
IsOnConflictUpdate(PlannedStmt *ps)
{
Plan *plan;
if (ps == NULL || ps->commandType != CMD_INSERT)
return false;
plan = ps->planTree;
if (plan && IsA(plan, Motion))
plan = outerPlan(plan);
if (plan == NULL || !IsA(plan, ModifyTable))
return false;
return ((ModifyTable *)plan)->onConflictAction == ONCONFLICT_UPDATE;
}
/*
* Avoid core file generation for this PANIC. It helps to avoid
* filling up disks during tests and also saves time.
*/
void
AvoidCorefileGeneration()
{
#if defined(HAVE_GETRLIMIT) && defined(RLIMIT_CORE)
struct rlimit lim;
getrlimit(RLIMIT_CORE, &lim);
lim.rlim_cur = 0;
if (setrlimit(RLIMIT_CORE, &lim) != 0)
{
int save_errno = errno;
elog(NOTICE,
"setrlimit failed for RLIMIT_CORE soft limit to zero. errno: %d (%m).",
save_errno);
}
#endif
}
相关信息
相关文章
greenplumn cdbappendonlystorageformat 源码
greenplumn cdbappendonlystorageread 源码
greenplumn cdbappendonlystoragewrite 源码
greenplumn cdbappendonlyxlog 源码
greenplumn cdbbufferedappend 源码
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦