greenplumn cdbcat 源码

  • 2022-08-18
  • 浏览 (378)

greenplumn cdbcat 代码

文件路径:/src/backend/cdb/cdbcat.c

/*-------------------------------------------------------------------------
 *
 * cdbcat.c
 *	  Routines for dealing with GpPolicy
 *
 * Portions Copyright (c) 2005-2008, Greenplum inc
 * Portions Copyright (c) 2012-Present VMware, Inc. or its affiliates.
 *
 *
 * IDENTIFICATION
 *	    src/backend/cdb/cdbcat.c
 *
 *-------------------------------------------------------------------------
 */

#include "postgres.h"

#include "access/external.h"
#include "access/genam.h"
#include "access/hash.h"
#include "access/heapam.h"
#include "access/htup_details.h"
#include "catalog/dependency.h"
#include "catalog/gp_distribution_policy.h"
#include "catalog/indexing.h"
#include "catalog/namespace.h"
#include "catalog/objectaddress.h"
#include "catalog/pg_constraint.h"
#include "catalog/pg_namespace.h"
#include "catalog/pg_opclass.h"
#include "catalog/pg_type.h"
#include "cdb/cdbcat.h"
#include "cdb/cdbhash.h"
#include "cdb/cdbutil.h"
#include "cdb/cdbvars.h"		/* Gp_role */
#include "foreign/foreign.h"
#include "utils/array.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/regproc.h"
#include "utils/syscache.h"
#include "utils/uri.h"

static int errdetails_index_policy(char *attname,
								   Oid policy_indclass,
								   Oid policy_eqop,
								   Oid found_indclass,
								   Oid *exclop,
								   index_check_policy_compatible_context *context);

/*
 * The default numsegments when creating tables.  The value can be an integer
 * between 1 and getgpsegmentCount() or one of below magic numbers:
 *
 * - GP_DEFAULT_NUMSEGMENTS_FULL: all the segments;
 * - GP_DEFAULT_NUMSEGMENTS_RANDOM: pick a random set of segments each time;
 * - GP_DEFAULT_NUMSEGMENTS_MINIMAL: the minimal set of segments;
 *
 * A wrapper macro GP_POLICY_DEFAULT_NUMSEGMENTS() is defined to get the default
 * numsegments according to the setting of this variable, always use that macro
 * instead of this variable.
 */
int			gp_create_table_default_numsegments = GP_DEFAULT_NUMSEGMENTS_FULL;


GpPolicy *
makeGpPolicy(GpPolicyType ptype, int nattrs, int numsegments)
{
	GpPolicy   *policy;
	size_t		size;
	char	   *p;

	size = MAXALIGN(sizeof(GpPolicy)) +
		MAXALIGN(nattrs * sizeof(AttrNumber)) +
		MAXALIGN(nattrs * sizeof(Oid));
	p = palloc(size);
	policy = (GpPolicy *) p;
	p += MAXALIGN(sizeof(GpPolicy));
	if (nattrs > 0)
	{
		policy->attrs = (AttrNumber *) p;
		p += MAXALIGN(nattrs * sizeof(AttrNumber));
		policy->opclasses = (Oid *) p;
		p += MAXALIGN(nattrs * sizeof(Oid));
	}
	else
	{
		policy->attrs = NULL;
		policy->opclasses = NULL;
	}

	policy->type = T_GpPolicy;
	policy->ptype = ptype; 
	policy->numsegments = numsegments;
	policy->nattrs = nattrs; 

	Assert(numsegments > 0 ||
		   (ptype == POLICYTYPE_ENTRY && numsegments == -1));

	return policy;
}

/*
 * createReplicatedGpPolicy-- Create a policy with replicated distribution
 */
GpPolicy *
createReplicatedGpPolicy(int numsegments)
{
	return makeGpPolicy(POLICYTYPE_REPLICATED, 0, numsegments);
}

/*
 * createRandomPartitionedPolicy -- Create a policy with randomly
 * partitioned distribution
 */
GpPolicy *
createRandomPartitionedPolicy(int numsegments)
{
	return makeGpPolicy(POLICYTYPE_PARTITIONED, 0, numsegments);
}

/*
 * createHashPartitionedPolicy-- Create a policy with data
 * partitioned by keys 
 */
GpPolicy *
createHashPartitionedPolicy(List *keys, List *opclasses, int numsegments)
{
	GpPolicy	*policy;
	ListCell 	*lc;
	ListCell 	*lop;
	int 		idx = 0;
	int 		len = list_length(keys);

	Assert(list_length(keys) == list_length(opclasses));

	if (len == 0)
		return createRandomPartitionedPolicy(numsegments);

	policy = makeGpPolicy(POLICYTYPE_PARTITIONED, len, numsegments);
	forboth(lc, keys, lop, opclasses)
	{
		policy->attrs[idx] = (AttrNumber) lfirst_int(lc);
		policy->opclasses[idx] = (Oid) lfirst_oid(lop);
		idx++;
	}

	return policy;	
}

/*
 * GpPolicyCopy -- Return a copy of a GpPolicy object.
 *
 * The copy is palloc'ed.
 */
GpPolicy *
GpPolicyCopy(const GpPolicy *src)
{
	GpPolicy   *tgt;
	int i;

	if (!src)
		return NULL;

	tgt = makeGpPolicy(src->ptype, src->nattrs, src->numsegments);

	for (i = 0; i < src->nattrs; i++)
	{
		tgt->attrs[i] = src->attrs[i];
		tgt->opclasses[i] = src->opclasses[i];
	}

	return tgt;
}								/* GpPolicyCopy */

/*
 * GpPolicyEqual -- Test equality of policies (by value only).
 */
bool
GpPolicyEqual(const GpPolicy *lft, const GpPolicy *rgt)
{
	int			i;

	if (lft == rgt)
		return true;

	if (!lft || !rgt)
		return false;

	if (lft->ptype != rgt->ptype)
		return false;

	if (lft->numsegments != rgt->numsegments)
		return false;

	if (lft->nattrs != rgt->nattrs)
		return false;

	for (i = 0; i < lft->nattrs; i++)
	{
		if (lft->attrs[i] != rgt->attrs[i])
			return false;
		if (lft->opclasses[i] != rgt->opclasses[i])
			return false;
	}

	return true;
}								/* GpPolicyEqual */

/*
 * Like GpPolicyEqual, but the attributes are matched by name.
 */
bool
GpPolicyEqualByName(const TupleDesc ltd, const GpPolicy *lpol,
					const TupleDesc rtd, const GpPolicy *rpol)
{
	int			i;

	if (!lpol || !rpol)
		return false;

	if (lpol->ptype != rpol->ptype)
		return false;

	if (lpol->numsegments != rpol->numsegments)
		return false;

	if (lpol->nattrs != rpol->nattrs)
		return false;

	for (i = 0; i < lpol->nattrs; i++)
	{
		Form_pg_attribute latt = TupleDescAttr(ltd, lpol->attrs[i] - 1);
		Form_pg_attribute ratt = TupleDescAttr(rtd, rpol->attrs[i] - 1);

		if (strcmp(NameStr(latt->attname), NameStr(ratt->attname)) != 0)
			return false;
		if (lpol->opclasses[i] != rpol->opclasses[i])
			return false;
	}

	return true;
}

bool
IsReplicatedTable(Oid relid)
{
	return GpPolicyIsReplicated(GpPolicyFetch(relid));
}

/*
 * Returns true only if the policy is a replicated.
 */
bool
GpPolicyIsReplicated(const GpPolicy *policy)
{
	if (policy == NULL)
		return false;

	return policy->ptype == POLICYTYPE_REPLICATED;
}

/*
 * Randomly-partitioned or keys-partitioned
 */
bool
GpPolicyIsPartitioned(const GpPolicy *policy)
{
	return (policy != NULL && policy->ptype == POLICYTYPE_PARTITIONED);
}

bool
GpPolicyIsRandomPartitioned(const GpPolicy *policy)
{
	if (policy == NULL)
		return false;

	return policy->ptype == POLICYTYPE_PARTITIONED &&
			policy->nattrs == 0;
}

bool
GpPolicyIsHashPartitioned(const GpPolicy *policy)
{
	if (policy == NULL)
		return false;

	return policy->ptype == POLICYTYPE_PARTITIONED &&
			policy->nattrs > 0;
}

bool
GpPolicyIsEntry(const GpPolicy *policy)
{
	if (policy == NULL)
		return false;

	return policy->ptype == POLICYTYPE_ENTRY;
}

/*
 * GpPolicyFetch
 *
 * Looks up the distribution policy of given relation from
 * gp_distribution_policy table (or by implicit rules for external tables)
 * Returns an GpPolicy object, allocated in the current memory context,
 * containing the information.
 *
 * The caller is responsible for passing in a valid relation oid.  This
 * function does not check, and assigns a policy of type POLICYTYPE_ENTRY
 * for any oid not found in gp_distribution_policy.
 */
GpPolicy *
GpPolicyFetch(Oid tbloid)
{
	GpPolicy   *policy = NULL;	/* The result */
	HeapTuple	gp_policy_tuple = NULL;

	/*
	 * EXECUTE-type external tables have an "ON ..." specification.
	 * See if it's "COORDINATOR_ONLY". Other types of external tables have a
	 * gp_distribution_policy row, like normal tables.
	 */
	if (rel_is_external_table(tbloid))
	{
		/*
		 * An external table really should have a catalog entry, but
		 * there's currently a transient state during creation of an external
		 * table, where the pg_class entry has been created, and its loaded
		 * into the relcache, before the catalog entry has been created.
		 * Silently ignore missing catalog rows to cope with that.
		 */
		ExtTableEntry *e = GetExtTableEntryIfExists(tbloid);

		/*
		 * Writeable external tables have gp_distribution_policy entries, like
		 * regular tables. Readable external tables are implicitly randomly
		 * distributed, except for "EXECUTE ... ON MASTER" ones.
		 */
		if (e)
		{
			char	   *on_clause = (char *) strVal(linitial(e->execlocations));

			if (!e->iswritable)
			{
				if (strcmp(on_clause, "COORDINATOR_ONLY") == 0)
				{
					return makeGpPolicy(POLICYTYPE_ENTRY, 0, -1);
				}
				return createRandomPartitionedPolicy(getgpsegmentCount());
			}
			else if (strcmp(on_clause, "COORDINATOR_ONLY") == 0)
			{
				ListCell   *cell;
				Assert(e->urilocations != NIL);

				/* set policy for writable s3 on master external table */
				foreach(cell, e->urilocations)
				{
					const char *uri_str = (char *) strVal(lfirst(cell));
					Uri	*uri = ParseExternalTableUri(uri_str);
					if (uri->protocol == URI_CUSTOM && 0 == pg_strncasecmp(uri->customprotocol, "s3", 2))
					{
						return makeGpPolicy(POLICYTYPE_ENTRY, 0, -1);
					}
				}
			}
		}
	}
	else if (get_rel_relkind(tbloid) == RELKIND_FOREIGN_TABLE)
	{
		/*
		 * Similar to the external table creation, there is a transient state
		 * during creation of a foreign table, where the pg_class entry has
		 * been created, before the pg_foreign_table entry has been created.
		 */
		HeapTuple	tp = SearchSysCache1(FOREIGNTABLEREL, ObjectIdGetDatum(tbloid));

		if (HeapTupleIsValid(tp))
		{
			ReleaseSysCache(tp);

			ForeignTable *f = GetForeignTable(tbloid);

			if (f->exec_location == FTEXECLOCATION_ALL_SEGMENTS)
			{
				/*
				 * Currently, foreign tables do not support a distribution
				 * policy, as opposed to writable external tables. For now,
				 * we will create a random partitioned policy for foreign
				 * tables that run on all segments. This will allow writing
				 * to foreign tables from all segments when the mpp_execute
				 * option is set to 'all segments'
				 */
				return createRandomPartitionedPolicy(getgpsegmentCount());
			}
		}
	}

	/*
	 * Select by value of the localoid field
	 *
	 * SELECT * FROM gp_distribution_policy WHERE localoid = :1
	 */
	gp_policy_tuple = SearchSysCache1(GPPOLICYID,
									 ObjectIdGetDatum(tbloid));

	/*
	 * Read first (and only) tuple
	 */
	if (HeapTupleIsValid(gp_policy_tuple))
	{
		Form_gp_distribution_policy policyform = (Form_gp_distribution_policy) GETSTRUCT(gp_policy_tuple);
		bool		isNull;
		int			i;
		int			nattrs;
		int2vector *distkey;
		oidvector  *distopclasses;

		/*
		 * Sanity check of numsegments.
		 *
		 * Currently, Gxact always use a fixed size of cluster after the Gxact started,
		 * If a table is expanded after Gxact started, we should report an error,
		 * otherwise, planner will arrange a gang whose size is larger than the size
		 * of cluster and dispatcher cannot handle this.
		 */
		if ((Gp_role == GP_ROLE_DISPATCH || Gp_role == GP_ROLE_EXECUTE) &&
			policyform->numsegments > getgpsegmentCount())
		{
			ReleaseSysCache(gp_policy_tuple);
			ereport(ERROR,
					(errcode(ERRCODE_GP_FEATURE_NOT_YET),
					 errmsg("cannot access table \"%s\" in current transaction",
							get_rel_name(tbloid)),
					 errdetail("New segments are concurrently added to the cluster during the execution of current transaction, "
							   "the table has data on some of the new segments, "
							   "but these new segments are invisible and inaccessible to current transaction."),
					 errhint("Re-run the query in a new transaction.")));
		}

		switch (policyform->policytype)
		{
			case SYM_POLICYTYPE_REPLICATED:
				policy = createReplicatedGpPolicy(policyform->numsegments);
				break;
			case SYM_POLICYTYPE_PARTITIONED:
				/*
				 * Get the attributes on which to partition.
				 */
				distkey = (int2vector *) DatumGetPointer(
					SysCacheGetAttr(GPPOLICYID, gp_policy_tuple,
									Anum_gp_distribution_policy_distkey,
									&isNull));

				/*
				 * Get distribution keys only if this table has a policy.
				 */
				if (!isNull)
				{
					nattrs = distkey->dim1;
					distopclasses = (oidvector *) DatumGetPointer(
						SysCacheGetAttr(GPPOLICYID, gp_policy_tuple,
										Anum_gp_distribution_policy_distclass,
										&isNull));
					Assert(!isNull);
					Assert(distopclasses->dim1 == nattrs);
				}
				else
					nattrs = 0;

				/* Create a GpPolicy object. */
				policy = makeGpPolicy(POLICYTYPE_PARTITIONED,
									  nattrs, policyform->numsegments);

				for (i = 0; i < nattrs; i++)
				{
					policy->attrs[i] = distkey->values[i];
					policy->opclasses[i] = distopclasses->values[i];
				}
				break;
			default:
				ReleaseSysCache(gp_policy_tuple);
				elog(ERROR, "unrecognized distribution policy type");	
		}

		ReleaseSysCache(gp_policy_tuple);
	}

	/* Interpret absence of a valid policy row as POLICYTYPE_ENTRY */
	if (policy == NULL)
	{
		return makeGpPolicy(POLICYTYPE_ENTRY, 0, -1);
	}

	return policy;
}								/* GpPolicyFetch */

/*
 * Sets the policy of a table into the gp_distribution_policy table
 * from a GpPolicy structure.
 */
void
GpPolicyStore(Oid tbloid, const GpPolicy *policy)
{
	Relation	gp_policy_rel;
	HeapTuple	gp_policy_tuple = NULL;

	bool		nulls[5];
	Datum		values[5];
	ObjectAddress myself,
				referenced;
	int			i;

	/* Sanity check the policy and its opclasses before storing it. */
	if (policy->ptype == POLICYTYPE_ENTRY)
		elog(ERROR, "cannot store entry-type policy in gp_distribution_policy");
	for (i = 0; i < policy->nattrs; i++)
	{
		if (policy->opclasses[i] == InvalidOid)
			elog(ERROR, "no hash operator class for distribution key column %d", i + 1);
	}

	nulls[0] = false;
	nulls[1] = false;
	nulls[2] = false;
	nulls[3] = false;
	nulls[4] = false;
	values[0] = ObjectIdGetDatum(tbloid);
	values[2] = Int32GetDatum(policy->numsegments);

	/*
	 * Open and lock the gp_distribution_policy catalog.
	 */
	gp_policy_rel = table_open(GpPolicyRelationId, RowExclusiveLock);

	if (GpPolicyIsReplicated(policy))
	{
		values[1] = CharGetDatum(SYM_POLICYTYPE_REPLICATED);
	}
	else
	{
		/*
		 * Convert C arrays into Postgres arrays.
		 */
		Assert(GpPolicyIsPartitioned(policy));

		values[1] = CharGetDatum(SYM_POLICYTYPE_PARTITIONED);
	}
	int2vector *attrnums = buildint2vector(policy->attrs, policy->nattrs);
	oidvector  *opclasses = buildoidvector(policy->opclasses, policy->nattrs);

	values[3] = PointerGetDatum(attrnums);
	values[4] = PointerGetDatum(opclasses);

	gp_policy_tuple = heap_form_tuple(RelationGetDescr(gp_policy_rel), values, nulls);

	/* Insert tuple into the relation */
	CatalogTupleInsert(gp_policy_rel, gp_policy_tuple);

	/*
	 * Register the table as dependent on the operator classes used in the
	 * distribution key.
	 *
	 * XXX: This prevents you from dropping the operator class, which is
	 * good. However, CASCADE behaviour is not so nice: if you do DROP
	 * OPERATOR CLASS CASCADE, we drop the whole table. Ideally, we would
	 * just change the policy to randomly distributed.
	 */
	myself.classId = RelationRelationId;
	myself.objectId = tbloid;
	myself.objectSubId = 0;
	for (i = 0; i < policy->nattrs; i++)
	{
		referenced.classId = OperatorClassRelationId;
		referenced.objectId = policy->opclasses[i];
		referenced.objectSubId = 0;

		recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
	}

	/*
	 * Close the gp_distribution_policy relcache entry without unlocking. We
	 * have updated the catalog: consequently the lock must be held until end
	 * of transaction.
	 */
	table_close(gp_policy_rel, NoLock);
}								/* GpPolicyStore */

/*
 * Sets the policy of a table into the gp_distribution_policy table
 * from a GpPolicy structure.
 */
void
GpPolicyReplace(Oid tbloid, const GpPolicy *policy)
{
	Relation	gp_policy_rel;
	HeapTuple	gp_policy_tuple = NULL;
	SysScanDesc scan;
	ScanKeyData skey;
	bool		nulls[5];
	Datum		values[5];
	bool		repl[5];
	ObjectAddress myself,
				referenced;
	int			i;

	/* Sanity check the policy and its opclasses before storing it. */
	if (policy->ptype == POLICYTYPE_ENTRY)
		elog(ERROR, "cannot store entry-type policy in gp_distribution_policy");
	for (i = 0; i < policy->nattrs; i++)
	{
		if (policy->opclasses[i] == InvalidOid)
			elog(ERROR, "no hash operator class for distribution key column %d", i + 1);
	}

	nulls[0] = false;
	nulls[1] = false;
	nulls[2] = false;
	nulls[3] = false;
	nulls[4] = false;
	values[0] = ObjectIdGetDatum(tbloid);
	values[2] = Int32GetDatum(policy->numsegments);

	/*
	 * Open and lock the gp_distribution_policy catalog.
	 */
	gp_policy_rel = table_open(GpPolicyRelationId, RowExclusiveLock);

	if (GpPolicyIsReplicated(policy))
	{
		values[1] = CharGetDatum(SYM_POLICYTYPE_REPLICATED);
	}
	else
	{
		/*
		 * Convert C arrays into Postgres arrays.
		 */
		Assert(GpPolicyIsPartitioned(policy));

		values[1] = CharGetDatum(SYM_POLICYTYPE_PARTITIONED);
	}
	int2vector *attrnums = buildint2vector(policy->attrs, policy->nattrs);
	oidvector  *opclasses = buildoidvector(policy->opclasses, policy->nattrs);

	values[3] = PointerGetDatum(attrnums);
	values[4] = PointerGetDatum(opclasses);

	repl[0] = false;
	repl[1] = true;
	repl[2] = true;
	repl[3] = true;
	repl[4] = true;


	/*
	 * Select by value of the localoid field
	 */
	ScanKeyInit(&skey,
				Anum_gp_distribution_policy_localoid,
				BTEqualStrategyNumber, F_OIDEQ,
				ObjectIdGetDatum(tbloid));
	scan = systable_beginscan(gp_policy_rel, GpPolicyLocalOidIndexId, true,
							  NULL, 1, &skey);

	/*
	 * Read first (and only ) tuple
	 */
	if ((gp_policy_tuple = systable_getnext(scan)) != NULL)
	{
		HeapTuple	newtuple = heap_modify_tuple(gp_policy_tuple,
												 RelationGetDescr(gp_policy_rel),
												 values, nulls, repl);

		CatalogTupleUpdate(gp_policy_rel, &gp_policy_tuple->t_self, newtuple);

		heap_freetuple(newtuple);
	}
	else
	{
		gp_policy_tuple = heap_form_tuple(gp_policy_rel->rd_att, values, nulls);
		CatalogTupleInsert(gp_policy_rel, gp_policy_tuple);
	}
	systable_endscan(scan);

	/*
	 * Remove old dependencies on opclasses, and store dependencies on the
	 * new ones.
	 */
	deleteDependencyRecordsForClass(RelationRelationId, tbloid,
									OperatorClassRelationId, DEPENDENCY_NORMAL);

	myself.classId = RelationRelationId;
	myself.objectId = tbloid;
	myself.objectSubId = 0;
	for (i = 0; i < policy->nattrs; i++)
	{
		referenced.classId = OperatorClassRelationId;
		referenced.objectId = policy->opclasses[i];
		referenced.objectSubId = 0;

		recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
	}

	/*
	 * Close the gp_distribution_policy relcache entry without unlocking. We
	 * have updated the catalog: consequently the lock must be held until end
	 * of transaction.
	 */
	table_close(gp_policy_rel, NoLock);
}								/* GpPolicyReplace */


/*
 * Removes the policy of a table from the gp_distribution_policy table
 * Does nothing if the policy doesn't exist.
 */
void
GpPolicyRemove(Oid tbloid)
{
	Relation	gp_policy_rel;
	ScanKeyData scankey;
	SysScanDesc sscan;
	HeapTuple	tuple;

	/*
	 * Open and lock the gp_distribution_policy catalog.
	 */
	gp_policy_rel = table_open(GpPolicyRelationId, RowExclusiveLock);

	/* Delete the policy entry from the catalog. */
	ScanKeyInit(&scankey, Anum_gp_distribution_policy_localoid,
				BTEqualStrategyNumber, F_OIDEQ,
				ObjectIdGetDatum(tbloid));

	sscan = systable_beginscan(gp_policy_rel, GpPolicyLocalOidIndexId, true,
							   NULL, 1, &scankey);

	while ((tuple = systable_getnext(sscan)) != NULL)
	{
		CatalogTupleDelete(gp_policy_rel, &tuple->t_self);
	}

	systable_endscan(sscan);

	/*
	 * This is currently only used while dropping the whole relation, which
	 * removes all pg_depend entries. So no need to remove them here.
	 */

	/*
	 * Close the gp_distribution_policy relcache entry without unlocking. We
	 * have updated the catalog: consequently the lock must be held until end
	 * of transaction.
	 */
	table_close(gp_policy_rel, NoLock);
}								/* GpPolicyRemove */

/*
 * Is the supplied GpPolicy compatible with a unique or exclusion constraint
 * index?
 *
 * This is used both when a new index is created (CREATE INDEX), and when
 * a table's distribution key is about to be changed (ALTER TABLE SET
 * DISTRIBUTED BY).
 *
 * The set of columns being indexed needs to be a superset of the distribution
 * policy. If the table is distributed randomly, no unique / exclusion
 * indexing is supported. If the table is replicated, all constraints all
 * supported.
 *
 * The index is described by 'indattr', 'indclasses', 'exclop', 'nidxatts'
 * parameters. Note that the parameters don't include expressions for an
 * expression index; expressions can never match distribution keys, so
 * they can be ignored here.
 *
 * Returns 'true', if the policy is compatible with the index.
 */
bool
index_check_policy_compatible(GpPolicy *policy,
							  TupleDesc desc,
							  AttrNumber *indattr,
							  Oid *indclasses,
							  Oid *exclop,
							  int nidxatts,
							  bool report_error,
							  index_check_policy_compatible_context *error_context)
{
	int			i;
	int			j;

	/*
	 * POLICYTYPE_ENTRY normally means it's a system table or a table created
	 * in utility mode, so unique/primary key is allowed anywhere.
	 */
	if (GpPolicyIsEntry(policy))
		return true;

	/*
	 * Firstly, unique indexes / exclusion constraints are not supported at
	 * all on randomly distributed tables.
	 *
	 * XXX: The error message here is worded as if we're adding a constraint. This
	 * function is also used for ALTER TABLE SET DISTRIBUTED BY, but as of this
	 * writing, with ALTER TABLE SET DISTRIBUTED BY the caller checks for these
	 * cases before calling this function, with a different error message. That
	 * seems redundant, but as long as the caller does that, we can amke that
	 * assumption in the error message.
	 */
	if (GpPolicyIsRandomPartitioned(policy))
	{
		if (report_error)
		{
			if (error_context->is_primarykey)
				ereport(ERROR,
						(errcode(ERRCODE_INVALID_TABLE_DEFINITION),
						 errmsg("PRIMARY KEY and DISTRIBUTED RANDOMLY are incompatible")));
			else if (exclop)
				ereport(ERROR,
						(errcode(ERRCODE_INVALID_TABLE_DEFINITION),
						 errmsg("exclusion constraint and DISTRIBUTED RANDOMLY are incompatible")));
			else
				ereport(ERROR,
						(errcode(ERRCODE_INVALID_TABLE_DEFINITION),
						 errmsg("UNIQUE and DISTRIBUTED RANDOMLY are incompatible")));
		}
		return false;
	}

	/*
	 * On the other hand, everything is supported on replicated tables.
	 */
	if (GpPolicyIsReplicated(policy))
		return true;

	/*
	 * Otherwise, it's hash distributed.
	 *
	 * Loop through each distribution key column, and check that it's part
	 * of the index.
	 */
	Assert(GpPolicyIsHashPartitioned(policy));
	Assert(policy->nattrs > 0);
	for (i = 0; i < policy->nattrs; i++)
	{
		int			policy_attr;
		Oid			policy_opclass;
		Oid			policy_opfamily;
		Oid			policy_typeid;
		Oid			policy_eqop;
		bool		found;
		bool		found_col;
		Oid			found_col_indclass;

		/* Look up the equality operator for the distribution key opclass */
		policy_attr = policy->attrs[i];
		policy_typeid = TupleDescAttr(desc, policy_attr - 1)->atttypid;
		policy_opclass = policy->opclasses[i];
		policy_opfamily = get_opclass_family(policy_opclass);
		policy_eqop = cdb_eqop_in_hash_opfamily(policy_opfamily, policy_typeid);

		/*
		 * Scan the index columns to see if any of them match the distribution
		 * key.
		 */
		found = false;
		found_col = false;
		found_col_indclass = InvalidOid;
		for (j = 0; j < nidxatts; j++)
		{
			Oid			indopfamily;
			Oid			opcintype;
			Oid			indeqop;

			if (indattr[j] != policy_attr)
				continue;
			found_col = true;

			/*
			 * Is the index's operator class is compatible with the
			 * distribution key's operator class? It's compatible, if it
			 * has the same equality operator.
			 *
			 * If this is an exclusion constraint, rather than a unique index,
			 * then we compare the exclusion operator instead. The exclusion
			 * operator should be the same operator as the distribution key
			 * opclass's equality operator.
			 */
			if (exclop)
				indeqop = exclop[j];
			else
			{
				found_col_indclass = indclasses[j];

				indopfamily = get_opclass_family(indclasses[j]);
				opcintype = get_opclass_input_type(indclasses[j]);
				indeqop = get_opfamily_member(indopfamily,
											  opcintype,
											  opcintype,
											  BTEqualStrategyNumber);
			}
			if (indeqop == policy_eqop)
			{
				found = true;
				break;
			}
		}

		if (!found)
		{
			/*
			 * If the caller asked for an ERROR, construct a suitable error
			 * message. The details of the message depend on the kind of
			 * constraint it is, and whether the distribution key was missing
			 * from the constraint altogther, or if it just had different
			 * opclass.
			 */
			if (report_error)
				ereport(ERROR,
						(errdetails_index_policy(NameStr(TupleDescAttr(desc, policy_attr - 1)->attname),
												 policy_opclass,
												 policy_eqop,
												 found_col_indclass,
												 exclop,
												 error_context)));
			return false;
		}
	}

	return true;
}


/*
 * Print the name of an operator class, for error messages.
 */
static const char *
format_opclass(Oid opclass)
{
	HeapTuple	ht_opc;
	Form_pg_opclass opcrec;
	char	   *opcname;
	char	   *nspname;
	const char *result;

	ht_opc = SearchSysCache1(CLAOID, ObjectIdGetDatum(opclass));
	if (!HeapTupleIsValid(ht_opc))
		elog(ERROR, "cache lookup failed for opclass %u", opclass);
	opcrec = (Form_pg_opclass) GETSTRUCT(ht_opc);

	/* do we need to schema-qualify the name? */
	opcname = NameStr(opcrec->opcname);
	if (OpclassIsVisible(opclass))
		result = quote_identifier(opcname);
	else
	{
		nspname = get_namespace_name(opcrec->opcnamespace);
		result = psprintf("%s.%s",
						  quote_identifier(nspname),
						  quote_identifier(opcname));
	}
	ReleaseSysCache(ht_opc);

	return result;
}

static int
errdetails_index_policy(char *attname,
						Oid policy_opclass,
						Oid policy_eqop,
						Oid found_indclass,
						Oid *exclop,
						index_check_policy_compatible_context *context)
{
	errcode(ERRCODE_INVALID_TABLE_DEFINITION);

	/*
	 * Print main message. The wording depends on whether we're ALTERing a
	 * table's distribution key, and the new distribution key isn't compatible
	 * with existing indexes, or if we're trying to build a new index that's
	 * not compatible with the table's distribution key. Both variants contain
	 * the same information, but we try to say that the thing we're changing
	 * is not compatible with the existing stuff, rather than the other way
	 * round.
	 *
	 * In the ALTER TABLE SET DISTRIBUTED BY, we print the name of the conflicting
	 * constraint/index. When we're adding an index, we leave that out, because
	 * that's right there in the CREATE INDEX or ALTER TABLE ADD CONSTRAINT command.
	 *
	 * XXX: If it's a CREATE TABLE with multiple UNIQUE constraints, it would be
	 * to printout which UNIQUE constraint is causing trouble. But we can't
	 * distinguish CREATE TABLE subcommands from a straight CREATE INDEX here.
	 */
	if (context->for_alter_dist_policy)
	{
		if (context->is_primarykey)
			errmsg("distribution policy is not compatible with the table's PRIMARY KEY");
		else if (exclop)
			errmsg("distribution policy is not compatible with exclusion constraint \"%s\"",
				   context->constraint_name);
		else
		{
			Assert (context->is_unique);
			if (context->is_constraint)
				errmsg("distribution policy is not compatible with UNIQUE constraint \"%s\"",
					   context->constraint_name);
			else
				errmsg("distribution policy is not compatible with UNIQUE index \"%s\"",
					   context->constraint_name);
		}
	}
	else
	{
		if (context->is_primarykey)
			errmsg("PRIMARY KEY definition must contain all columns in the table's distribution key");
		else if (exclop)
			errmsg("exclusion constraint is not compatible with the table's distribution policy");
		else
		{
			Assert (context->is_unique);
			if (context->is_constraint)
				errmsg("UNIQUE constraint must contain all columns in the table's distribution key");
			else
				errmsg("UNIQUE index must contain all columns in the table's distribution key");
		}
	}

	/*
	 * Print details of which distribution column is causing the trouble.
	 */
	if (exclop)
	{
		errdetail("Distribution key column \"%s\" is not included in the constraint.",
				  attname);
		errhint("Add \"%s\" to the constraint with the %s operator.",
				attname, format_operator(policy_eqop));
	}
	else if (found_indclass != InvalidOid)
	{
		/*
		 * A unique index contained the distribution key column, but with
		 * an incompatible opclass.
		 *
		 * It would be nice to hint what a compatible operator class be.
		 * But it'd take some effort to dig that from the catalogs.
		 */
		errdetail("Operator class %s of distribution key column \"%s\" is not compatible with operator class %s used in the constraint.",
				  format_opclass(policy_opclass),
				  attname,
				  format_opclass(found_indclass));
	}
	else
	{
		errdetail("Distribution key column \"%s\" is not included in the constraint.",
				  attname);
	}

	return 0;
}

相关信息

greenplumn 源码目录

相关文章

greenplumn cdbappendonlystorageformat 源码

greenplumn cdbappendonlystorageread 源码

greenplumn cdbappendonlystoragewrite 源码

greenplumn cdbappendonlyxlog 源码

greenplumn cdbbufferedappend 源码

greenplumn cdbbufferedread 源码

greenplumn cdbcopy 源码

greenplumn cdbdistributedsnapshot 源码

greenplumn cdbdistributedxacts 源码

greenplumn cdbdistributedxid 源码

0  赞