greenplumn cdbmutate 源码

  • 2022-08-18
  • 浏览 (365)

greenplumn cdbmutate 代码

文件路径:/src/backend/cdb/cdbmutate.c

/*-------------------------------------------------------------------------
 *
 * cdbmutate.c
 *	  Parallelize a PostgreSQL sequential plan tree.
 *
 * Portions Copyright (c) 2004-2008, Greenplum inc
 * Portions Copyright (c) 2012-Present VMware, Inc. or its affiliates.
 *
 *
 * IDENTIFICATION
 *	    src/backend/cdb/cdbmutate.c
 *
 *-------------------------------------------------------------------------
 */

#include "postgres.h"

#include "access/relation.h"
#include "access/xact.h"
#include "parser/parsetree.h"	/* for rt_fetch() */
#include "optimizer/planmain.h"
#include "parser/parse_relation.h"
#include "utils/fmgroids.h"
#include "utils/guc.h"
#include "utils/lsyscache.h"
#include "utils/datum.h"
#include "utils/syscache.h"
#include "optimizer/clauses.h"
#include "optimizer/optimizer.h"
#include "optimizer/pathnode.h"
#include "nodes/makefuncs.h"

#include "commands/tablecmds.h"
#include "commands/tablespace.h"
#include "commands/trigger.h"
#include "catalog/pg_operator.h"
#include "catalog/pg_proc.h"

#include "cdb/cdbhash.h"
#include "cdb/cdbllize.h"
#include "cdb/cdbmutate.h"
#include "cdb/cdbplan.h"
#include "cdb/cdbpullup.h"
#include "cdb/cdbvars.h"
#include "cdb/cdbutil.h"
#include "cdb/cdbtargeteddispatch.h"

#include "executor/executor.h"

typedef struct
{
	plan_tree_base_prefix base; /* Required prefix for
								 * plan_tree_walker/mutator */
	EState	   *estate;
	bool		single_row_insert;
	List	   *cursorPositions;
} pre_dispatch_function_evaluation_context;

/*
 * Forward Declarations
 */
static Node *pre_dispatch_function_evaluation_mutator(Node *node,
										 pre_dispatch_function_evaluation_context *context);
static bool replace_shareinput_targetlists_walker(Node *node, PlannerInfo *root, bool fPop);


Motion *
make_union_motion(Plan *lefttree)
{
	Motion	   *motion;

	motion = make_motion(NULL, lefttree,
						 0, NULL, NULL, NULL, NULL /* no ordering */);

	motion->motionType = MOTIONTYPE_GATHER;
	motion->hashExprs = NIL;
	motion->hashFuncs = NULL;

	return motion;
}

Motion *
make_sorted_union_motion(PlannerInfo *root, Plan *lefttree, int numSortCols,
						 AttrNumber *sortColIdx, Oid *sortOperators,
						 Oid *collations, bool *nullsFirst)
{
	Motion	   *motion;

	motion = make_motion(root, lefttree,
						 numSortCols, sortColIdx, sortOperators, collations, nullsFirst);
	motion->motionType = MOTIONTYPE_GATHER;
	motion->hashExprs = NIL;
	motion->hashFuncs = NULL;

	return motion;
}

Motion *
make_hashed_motion(Plan *lefttree,
				   List *hashExprs,
				   List *hashOpfamilies,
				   int numHashSegments)
{
	Motion	   *motion;
	Oid		   *hashFuncs;
	ListCell   *expr_cell;
	ListCell   *opf_cell;
	int			i;

	Assert(numHashSegments > 0);
	Assert(list_length(hashExprs) == list_length(hashOpfamilies));

	/* Look up the right hash functions for the hash expressions */
	hashFuncs = palloc(list_length(hashExprs) * sizeof(Oid));
	i = 0;
	forboth(expr_cell, hashExprs, opf_cell, hashOpfamilies)
	{
		Node	   *expr = lfirst(expr_cell);
		Oid			opfamily = lfirst_oid(opf_cell);
		Oid			typeoid = exprType(expr);

		hashFuncs[i++] = cdb_hashproc_in_opfamily(opfamily, typeoid);
	}

	motion = make_motion(NULL, lefttree,
						 0, NULL, NULL, NULL, NULL /* no ordering */);
	motion->motionType = MOTIONTYPE_HASH;
	motion->hashExprs = hashExprs;
	motion->hashFuncs = hashFuncs;
	motion->numHashSegments = numHashSegments;

	return motion;
}

Motion *
make_broadcast_motion(Plan *lefttree)
{
	Motion	   *motion;

	motion = make_motion(NULL, lefttree,
						 0, NULL, NULL, NULL, NULL /* no ordering */);
	motion->motionType = MOTIONTYPE_BROADCAST;
	motion->hashExprs = NIL;
	motion->hashFuncs = NULL;

	return motion;
}

Plan *
make_explicit_motion(PlannerInfo *root, Plan *lefttree, AttrNumber segidColIdx)
{
	Motion	   *motion;
	plan_tree_base_prefix base;

	base.node = (Node *) root;

	Assert(segidColIdx > 0 && segidColIdx <= list_length(lefttree->targetlist));

	motion = make_motion(NULL, lefttree,
						 0, NULL, NULL, NULL, NULL /* no ordering */);
	motion->motionType = MOTIONTYPE_EXPLICIT;
	motion->hashExprs = NIL;
	motion->hashFuncs = NULL;
	motion->segidColIdx = segidColIdx;

	return (Plan *) motion;
}

/* --------------------------------------------------------------------
 *
 *	Static Helper routines
 * --------------------------------------------------------------------
 */

/* ----------------------------------------------------------------
 * getExprListFromTargetList
 *
 * Creates a VAR that references the indicated entries from the target list,
 * sets the restype and restypmod fields from the target list info,
 * and puts them into a list.
 *
 * The AttrNumber indexes actually refer to the 1 based index into the
 * target list.
 *
 * The entries have the varno field replaced by references in OUTER_VAR.
 * ----------------------------------------------------------------
 */
List *
getExprListFromTargetList(List *tlist,
						  int numCols,
						  AttrNumber *colIdx)
{
	int			i;
	List	   *elist = NIL;

	for (i = 0; i < numCols; i++)
	{
		/* Find expr in TargetList */
		AttrNumber	n = colIdx[i];

		TargetEntry *target = get_tle_by_resno(tlist, n);

		if (target == NULL)
			elog(ERROR, "no tlist entry for key %d", n);

		elist = lappend(elist, copyObject(target->expr));
	}

	return elist;
}

/* ----------------------------------------------------------------------- *
 * cdbmutate_warn_ctid_without_segid() warns the user if the plan refers to a
 * partitioned table's ctid column without also referencing its
 * gp_segment_id column.
 * ----------------------------------------------------------------------- *
 */
typedef struct ctid_inventory_context
{
	plan_tree_base_prefix base; /* Required prefix for
								 * plan_tree_walker/mutator */
	bool		uses_ctid;
	bool		uses_segid;
	Index		relid;
} ctid_inventory_context;

static bool
ctid_inventory_walker(Node *node, ctid_inventory_context *inv)
{
	if (node == NULL)
		return false;

	if (IsA(node, Var))
	{
		Var		   *var = (Var *) node;

		if (var->varattno < 0 &&
			var->varno == inv->relid &&
			var->varlevelsup == 0)
		{
			if (var->varattno == SelfItemPointerAttributeNumber)
				inv->uses_ctid = true;
			else if (var->varattno == GpSegmentIdAttributeNumber)
				inv->uses_segid = true;
		}
		return false;
	}
	return plan_tree_walker(node, ctid_inventory_walker, inv, true);
}

void
cdbmutate_warn_ctid_without_segid(struct PlannerInfo *root, struct RelOptInfo *rel)
{
	ctid_inventory_context inv;
	Relids		relids_to_ignore;
	ListCell   *cell;
	AttrNumber	attno;
	int			ndx;

	planner_init_plan_tree_base(&inv.base, root);
	inv.uses_ctid = false;
	inv.uses_segid = false;
	inv.relid = rel->relid;

	/* Rel not distributed?  Then segment id doesn't matter. */
	if (!rel->cdbpolicy ||
		rel->cdbpolicy->ptype == POLICYTYPE_ENTRY)
		return;

	/* Ignore references occurring in the Query's final output targetlist. */
	relids_to_ignore = bms_make_singleton(0);

	/* Is 'ctid' referenced in join quals? */
	attno = SelfItemPointerAttributeNumber;
	Assert(attno >= rel->min_attr && attno <= rel->max_attr);
	ndx = attno - rel->min_attr;
	if (bms_nonempty_difference(rel->attr_needed[ndx], relids_to_ignore))
		inv.uses_ctid = true;

	/* Is 'gp_segment_id' referenced in join quals? */
	attno = GpSegmentIdAttributeNumber;
	Assert(attno >= rel->min_attr && attno <= rel->max_attr);
	ndx = attno - rel->min_attr;
	if (bms_nonempty_difference(rel->attr_needed[ndx], relids_to_ignore))
		inv.uses_segid = true;

	/* Examine the single-table quals on this rel. */
	foreach(cell, rel->baserestrictinfo)
	{
		RestrictInfo *rinfo = (RestrictInfo *) lfirst(cell);

		Assert(IsA(rinfo, RestrictInfo));

		ctid_inventory_walker((Node *) rinfo->clause, &inv);
	}

	/* Notify client if found a reference to ctid only, without gp_segment_id */
	if (inv.uses_ctid &&
		!inv.uses_segid)
	{
		RangeTblEntry *rte = rt_fetch(rel->relid, root->parse->rtable);
		const char *cmd;
		int			elevel;

		/* Reject if UPDATE or DELETE.  Otherwise just give info msg. */
		switch (root->parse->commandType)
		{
			case CMD_UPDATE:
				cmd = "UPDATE";
				elevel = ERROR;
				break;

			case CMD_DELETE:
				cmd = "DELETE";
				elevel = ERROR;
				break;

			default:
				cmd = "SELECT";
				elevel = NOTICE;
		}

		ereport(elevel,
				(errmsg("%s uses system-defined column \"%s.ctid\" without the necessary companion column \"%s.gp_segment_id\"",
						cmd, rte->eref->aliasname, rte->eref->aliasname),
				 errhint("To uniquely identify a row within a distributed table, use the \"gp_segment_id\" column together with the \"ctid\" column.")));
	}
	bms_free(relids_to_ignore);
}								/* cdbmutate_warn_ctid_without_segid */


/*
 * Code that mutate the tree for share input
 *
 * After the planner, the plan is really a DAG.  SISCs will have valid
 * pointer to the underlying share.  However, other code (setrefs etc)
 * depends on the fact that the plan is a tree.  We first mutate the DAG
 * to a tree.
 *
 * Next, we will need to decide if the share is cross slices.  If the share
 * is not cross slice, we do not need the syncrhonization, and it is possible to
 * keep the Material/Sort in memory to save a sort.
 *
 * It is essential that we walk the tree in the same order as the ExecProcNode start
 * execution, otherwise, deadlock may rise.
 */

/* Walk the tree for shareinput.
 * Shareinput fix shared_as_id and underlying_share_id of nodes in place.  We do not want to use
 * the ordinary tree walker as it is unnecessary to make copies etc.
 */
typedef bool (*SHAREINPUT_MUTATOR) (Node *node, PlannerInfo *root, bool fPop);
static void
shareinput_walker(SHAREINPUT_MUTATOR f, Node *node, PlannerInfo *root)
{
	Plan	   *plan = NULL;
	bool		recursive_down;

	if (node == NULL)
		return;

	if (IsA(node, List))
	{
		List	   *l = (List *) node;
		ListCell   *lc;

		foreach(lc, l)
		{
			Node	   *n = lfirst(lc);

			shareinput_walker(f, n, root);
		}
		return;
	}

	if (!is_plan_node(node))
		return;

	plan = (Plan *) node;
	recursive_down = (*f) (node, root, false);

	if (recursive_down)
	{
		if (IsA(node, Append))
		{
			ListCell   *cell;
			Append	   *app = (Append *) node;

			foreach(cell, app->appendplans)
				shareinput_walker(f, (Node *) lfirst(cell), root);
		}
		else if (IsA(node, ModifyTable))
		{
			ListCell   *cell;
			ModifyTable *mt = (ModifyTable *) node;

			foreach(cell, mt->plans)
				shareinput_walker(f, (Node *) lfirst(cell), root);
		}
		else if (IsA(node, SubqueryScan))
		{
			SubqueryScan  *subqscan = (SubqueryScan *) node;
			PlannerGlobal *glob = root->glob;
			PlannerInfo   *subroot;
			List	      *save_rtable;
			RelOptInfo    *rel;

			/*
			 * If glob->finalrtable is not NULL, rtables have been flatten,
			 * thus we should use glob->finalrtable instead.
			 */
			save_rtable = glob->share.curr_rtable;
			if (root->glob->finalrtable == NULL)
			{
				rel = find_base_rel(root, subqscan->scan.scanrelid);
				/*
				 * The Assert() on RelOptInfo's subplan being
				 * same as the subqueryscan's subplan, is valid
				 * in Upstream but for not for GPDB, since we
				 * create a new copy of the subplan if two
				 * SubPlans refer to the same initplan.
				 */
				subroot = rel->subroot;
				glob->share.curr_rtable = subroot->parse->rtable;
			}
			else
			{
				subroot = root;
				glob->share.curr_rtable = glob->finalrtable;
			}
			shareinput_walker(f, (Node *) subqscan->subplan, subroot);
			glob->share.curr_rtable = save_rtable;
		}
		else if (IsA(node, TableFunctionScan))
		{
			TableFunctionScan  *tfscan = (TableFunctionScan *) node;
			PlannerGlobal *glob = root->glob;
			PlannerInfo   *subroot;
			List	      *save_rtable;
			RelOptInfo    *rel;

			/*
			 * If glob->finalrtable is not NULL, rtables have been flatten,
			 * thus we should use glob->finalrtable instead.
			 */
			save_rtable = glob->share.curr_rtable;
			if (root->glob->finalrtable == NULL)
			{
				rel = find_base_rel(root, tfscan->scan.scanrelid);
				subroot = rel->subroot;
				glob->share.curr_rtable = subroot->parse->rtable;
			}
			else
			{
				subroot = root;
				glob->share.curr_rtable = glob->finalrtable;
			}
			shareinput_walker(f, (Node *)  tfscan->scan.plan.lefttree, subroot);
			glob->share.curr_rtable = save_rtable;
		}
		else if (IsA(node, BitmapAnd))
		{
			ListCell   *cell;
			BitmapAnd  *ba = (BitmapAnd *) node;

			foreach(cell, ba->bitmapplans)
				shareinput_walker(f, (Node *) lfirst(cell), root);
		}
		else if (IsA(node, BitmapOr))
		{
			ListCell   *cell;
			BitmapOr   *bo = (BitmapOr *) node;

			foreach(cell, bo->bitmapplans)
				shareinput_walker(f, (Node *) lfirst(cell), root);
		}
		else if (IsA(node, NestLoop))
		{
			/*
			 * Nest loop join is strange.  Exec order depends on
			 * prefetch_inner
			 */
			NestLoop   *nl = (NestLoop *) node;

			if (nl->join.prefetch_inner)
			{
				shareinput_walker(f, (Node *) plan->righttree, root);
				shareinput_walker(f, (Node *) plan->lefttree, root);
			}
			else
			{
				shareinput_walker(f, (Node *) plan->lefttree, root);
				shareinput_walker(f, (Node *) plan->righttree, root);
			}
		}
		else if (IsA(node, HashJoin))
		{
			/* Hash join the hash table is at inner */
			shareinput_walker(f, (Node *) plan->righttree, root);
			shareinput_walker(f, (Node *) plan->lefttree, root);
		}
		else if (IsA(node, MergeJoin))
		{
			MergeJoin  *mj = (MergeJoin *) node;

			if (mj->unique_outer)
			{
				shareinput_walker(f, (Node *) plan->lefttree, root);
				shareinput_walker(f, (Node *) plan->righttree, root);
			}
			else
			{
				shareinput_walker(f, (Node *) plan->righttree, root);
				shareinput_walker(f, (Node *) plan->lefttree, root);
			}
		}
		else if (IsA(node, Sequence))
		{
			ListCell   *cell = NULL;
			Sequence   *sequence = (Sequence *) node;

			foreach(cell, sequence->subplans)
			{
				shareinput_walker(f, (Node *) lfirst(cell), root);
			}
		}
		else
		{
			shareinput_walker(f, (Node *) plan->lefttree, root);
			shareinput_walker(f, (Node *) plan->righttree, root);
			shareinput_walker(f, (Node *) plan->initPlan, root);
		}
	}

	(*f) (node, root, true);
}

/*
 * Create a fake CTE range table entry that reflects the target list of a
 * shared input.
 */
static RangeTblEntry *
create_shareinput_producer_rte(ApplyShareInputContext *ctxt, int share_id,
							   int refno)
{
	int			attno = 1;
	ListCell   *lc;
	Plan	   *subplan;
	char		buf[100];
	RangeTblEntry *rte;
	List	   *colnames = NIL;
	List	   *coltypes = NIL;
	List	   *coltypmods = NIL;
	List	   *colcollations = NIL;

	Assert(ctxt->shared_plans);
	Assert(ctxt->shared_input_count > share_id);
	subplan = ctxt->shared_plans[share_id];

	foreach(lc, subplan->targetlist)
	{
		TargetEntry *tle = (TargetEntry *) lfirst(lc);
		Oid			vartype;
		int32		vartypmod;
		Oid			varcollid;
		char	   *resname;

		vartype = exprType((Node *) tle->expr);
		vartypmod = exprTypmod((Node *) tle->expr);
		varcollid = exprCollation((Node *) tle->expr);

		/*
		 * We should've filled in tle->resname in shareinput_save_producer().
		 * Note that it's too late to call get_tle_name() here, because this
		 * runs after all the varnos in Vars have already been changed to
		 * INNER_VAR/OUTER_VAR.
		 */
		resname = tle->resname;
		if (!resname)
			resname = pstrdup("unnamed_attr");

		colnames = lappend(colnames, makeString(resname));
		coltypes = lappend_oid(coltypes, vartype);
		coltypmods = lappend_int(coltypmods, vartypmod);
		colcollations = lappend_oid(colcollations, varcollid);
		attno++;
	}

	/*
	 * Create a new RTE. Note that we use a different RTE for each reference,
	 * because we want to give each reference a different name.
	 */
	snprintf(buf, sizeof(buf), "share%d_ref%d", share_id, refno);

	rte = makeNode(RangeTblEntry);
	rte->rtekind = RTE_CTE;
	rte->ctename = pstrdup(buf);
	rte->ctelevelsup = 0;
	rte->self_reference = false;
	rte->alias = NULL;

	rte->eref = makeAlias(rte->ctename, colnames);
	rte->coltypes = coltypes;
	rte->coltypmods = coltypmods;
	rte->colcollations = colcollations;

	rte->inh = false;
	rte->inFromCl = false;

	rte->requiredPerms = 0;
	rte->checkAsUser = InvalidOid;

	return rte;
}

/*
 * Memorize the shared plan of a shared input in an array, one per share_id.
 */
static void
shareinput_save_producer(ShareInputScan *plan, ApplyShareInputContext *ctxt)
{
	int			share_id = plan->share_id;
	int			new_shared_input_count = (share_id + 1);

	Assert(plan->share_id >= 0);

	if (ctxt->shared_plans == NULL)
	{
		ctxt->shared_plans = palloc0(sizeof(Plan *) * new_shared_input_count);
		ctxt->shared_input_count = new_shared_input_count;
	}
	else if (ctxt->shared_input_count < new_shared_input_count)
	{
		ctxt->shared_plans = repalloc(ctxt->shared_plans, new_shared_input_count * sizeof(Plan *));
		memset(&ctxt->shared_plans[ctxt->shared_input_count], 0, (new_shared_input_count - ctxt->shared_input_count) * sizeof(Plan *));
		ctxt->shared_input_count = new_shared_input_count;
	}

	Assert(ctxt->shared_plans[share_id] == NULL);
	ctxt->shared_plans[share_id] = plan->scan.plan.lefttree;
}

/*
 * When a plan comes out of the planner, all the ShareInputScan nodes belonging
 * to the same "share" have the same child node. apply_shareinput_dag_to_tree()
 * turns the DAG into a proper tree. The first occurrence of a ShareInput scan,
 * with a particular child tree, becomes the "producer" of the share, and the
 * others becomes consumers. The subtree is removed from all the consumer nodes.
 *
 * Also, a share_id is assigned to each ShareInputScan node, as well as the
 * Material/Sort nodes below the producers. The producers and its consumers
 * are linked together by the same share_id.
 */
static bool
shareinput_mutator_dag_to_tree(Node *node, PlannerInfo *root, bool fPop)
{
	PlannerGlobal *glob = root->glob;
	ApplyShareInputContext *ctxt = &glob->share;
	Plan	   *plan = (Plan *) node;

	if (fPop)
		return true;

	if (IsA(plan, ShareInputScan))
	{
		ShareInputScan *siscan = (ShareInputScan *) plan;
		Plan	   *subplan = plan->lefttree;
		int			share_id;
		int			attno;
		ListCell   *lc;

		/* on entry, all ShareInputScans should have a child */
		Assert(subplan);

		/* Is there a producer for this sub-tree already? */
		for (share_id = 0; share_id < ctxt->shared_input_count; share_id++)
		{
			if (ctxt->shared_plans[share_id] == subplan)
			{
				/*
				 * Yes. This is a consumer. Remove the subtree, and assign the
				 * same share_id as the producer.
				 */
				siscan->share_id = share_id;
				siscan->scan.plan.lefttree = NULL;
				return false;
			}
		}

		/*
		 * Couldn't find a match in existing list of producers, so this is a
		 * producer. Add this to the list of producers, and assign a new
		 * share_id.
		 */
		siscan->share_id = share_id;

		shareinput_save_producer(siscan, ctxt);

		/*
		 * Also make sure that all the entries in the subplan's target list
		 * have human-readable column names. They are used for EXPLAIN.
		 */
		attno = 1;
		foreach(lc, subplan->targetlist)
		{
			TargetEntry *tle = (TargetEntry *) lfirst(lc);

			if (tle->resname == NULL)
			{
				char		default_name[100];
				char	   *resname;

				snprintf(default_name, sizeof(default_name), "col_%d", attno);

				resname = strVal(get_tle_name(tle, ctxt->curr_rtable, default_name));
				tle->resname = pstrdup(resname);
			}
			attno++;
		}
	}

	return true;
}

Plan *
apply_shareinput_dag_to_tree(PlannerInfo *root, Plan *plan)
{
	PlannerGlobal *glob = root->glob;

	glob->share.curr_rtable = root->parse->rtable;
	shareinput_walker(shareinput_mutator_dag_to_tree, (Node *) plan, root);
	return plan;
}

/*
 * Collect all the producer ShareInput nodes into an array, for later use by
 * replace_shareinput_targetlists().
 *
 * This is a stripped-down version of apply_shareinput_dag_to_tree(), for use
 * on ORCA-produced plans. ORCA assigns share_ids to all ShareInputScan nodes,
 * and only producer nodes have a subtree, so we don't need to do the DAG to
 * tree conversion or assign share_ids here.
 */
static bool
collect_shareinput_producers_walker(Node *node, PlannerInfo *root, bool fPop)
{
	PlannerGlobal *glob = root->glob;
	ApplyShareInputContext *ctxt = &glob->share;

	if (fPop)
		return true;

	if (IsA(node, ShareInputScan))
	{
		ShareInputScan *siscan = (ShareInputScan *) node;
		Plan	   *subplan = siscan->scan.plan.lefttree;

		Assert(siscan->share_id >= 0);

		if (subplan)
			shareinput_save_producer(siscan, ctxt);
	}
	return true;
}

void
collect_shareinput_producers(PlannerInfo *root, Plan *plan)
{
	PlannerGlobal *glob = root->glob;

	glob->share.curr_rtable = glob->finalrtable;
	shareinput_walker(collect_shareinput_producers_walker, (Node *) plan, root);
}

/* Some helper: implements a stack using List. */
static void
shareinput_pushmot(ApplyShareInputContext *ctxt, int motid)
{
	ctxt->motStack = lcons_int(motid, ctxt->motStack);
}
static void
shareinput_popmot(ApplyShareInputContext *ctxt)
{
	list_delete_first(ctxt->motStack);
}
static int
shareinput_peekmot(ApplyShareInputContext *ctxt)
{
	return linitial_int(ctxt->motStack);
}


/*
 * Replace the target list of ShareInputScan nodes, with references
 * to CTEs that we build on the fly.
 *
 * Only one of the ShareInputScan nodes in a plan tree contains the real
 * child plan, while others contain just a "share id" that binds all the
 * ShareInputScan nodes sharing the same input together. The missing
 * child plan is a problem for EXPLAIN, as any OUTER Vars in the
 * ShareInputScan's target list cannot be resolved without the child
 * plan.
 *
 * To work around that issue, create a CTE for each shared input node, with
 * columns that match the target list of the SharedInputScan's subplan,
 * and replace the target list entries of the SharedInputScan with
 * Vars that point to the CTE instead of the child plan.
 */
Plan *
replace_shareinput_targetlists(PlannerInfo *root, Plan *plan)
{
	shareinput_walker(replace_shareinput_targetlists_walker, (Node *) plan, root);
	return plan;
}

static bool
replace_shareinput_targetlists_walker(Node *node, PlannerInfo *root, bool fPop)
{
	PlannerGlobal *glob = root->glob;
	ApplyShareInputContext *ctxt = &glob->share;

	if (fPop)
		return true;

	if (IsA(node, ShareInputScan))
	{
		ShareInputScan *sisc = (ShareInputScan *) node;
		int			share_id = sisc->share_id;
		ListCell   *lc;
		int			attno;
		List	   *newtargetlist;
		RangeTblEntry *rte;

		/*
		 * Note that even though the planner assigns sequential share_ids for
		 * each shared node, so that share_id is always below
		 * list_length(ctxt->sharedNodes), ORCA has a different assignment
		 * scheme. So we have to be prepared for any share_id, at least when
		 * ORCA is in use.
		 */
		if (ctxt->share_refcounts == NULL)
		{
			int			new_sz = share_id + 1;

			ctxt->share_refcounts = palloc0(new_sz * sizeof(int));
			ctxt->share_refcounts_sz = new_sz;
		}
		else if (share_id >= ctxt->share_refcounts_sz)
		{
			int			old_sz = ctxt->share_refcounts_sz;
			int			new_sz = share_id + 1;

			ctxt->share_refcounts = repalloc(ctxt->share_refcounts, new_sz * sizeof(int));
			memset(&ctxt->share_refcounts[old_sz], 0, (new_sz - old_sz) * sizeof(int));
			ctxt->share_refcounts_sz = new_sz;
		}

		ctxt->share_refcounts[share_id]++;

		/*
		 * Create a new RTE. Note that we use a different RTE for each
		 * reference, because we want to give each reference a different name.
		 */
		rte = create_shareinput_producer_rte(ctxt, share_id,
											 ctxt->share_refcounts[share_id]);

		glob->finalrtable = lappend(glob->finalrtable, rte);
		sisc->scan.scanrelid = list_length(glob->finalrtable);

		/*
		 * Replace all the target list entries.
		 *
		 * SharedInputScan nodes are not projection-capable, so the target
		 * list of the SharedInputScan matches the subplan's target list.
		 */
		newtargetlist = NIL;
		attno = 1;
		foreach(lc, sisc->scan.plan.targetlist)
		{
			TargetEntry *tle = (TargetEntry *) lfirst(lc);
			TargetEntry *newtle = flatCopyTargetEntry(tle);

			newtle->expr = (Expr *) makeVar(sisc->scan.scanrelid, attno,
											exprType((Node *) tle->expr),
											exprTypmod((Node *) tle->expr),
											exprCollation((Node *) tle->expr),
											0);
			newtargetlist = lappend(newtargetlist, newtle);
			attno++;
		}
		sisc->scan.plan.targetlist = newtargetlist;
	}

	return true;
}

/*
 * First walk on shareinput xslice. Collect information about the producer
 * and consumer slice IDs for each share. It also builds a list of shares
 * that should run in the QD.
 */
static bool
shareinput_mutator_xslice_1(Node *node, PlannerInfo *root, bool fPop)
{
	PlannerGlobal *glob = root->glob;
	ApplyShareInputContext *ctxt = &glob->share;
	Plan	   *plan = (Plan *) node;

	if (fPop)
	{
		if (IsA(plan, Motion))
			shareinput_popmot(ctxt);
		return false;
	}

	if (IsA(plan, Motion))
	{
		Motion	   *motion = (Motion *) plan;

		shareinput_pushmot(ctxt, motion->motionID);
		return true;
	}

	if (IsA(plan, ShareInputScan))
	{
		ShareInputScan *sisc = (ShareInputScan *) plan;
		int			motId = shareinput_peekmot(ctxt);
		Plan	   *shared = plan->lefttree;
		PlanSlice  *currentSlice;
		ApplyShareInputContextPerShare *share_info;

		share_info = &ctxt->shared_inputs[sisc->share_id];

		currentSlice = &ctxt->slices[motId];
		if (currentSlice->gangType == GANGTYPE_UNALLOCATED ||
			currentSlice->gangType == GANGTYPE_ENTRYDB_READER)
		{
			ctxt->qdShares = bms_add_member(ctxt->qdShares, sisc->share_id);
		}

		/* Remember information about the slice that this instance appears in. */
		if (shared)
			ctxt->shared_inputs[sisc->share_id].producer_slice_id = motId;
		share_info->participant_slices = bms_add_member(share_info->participant_slices, motId);

		sisc->this_slice_id = motId;
	}

	return true;
}

/*
 * Second pass:
 * 1. Mark shareinput scans with multiple consumer slices as cross-slice.
 * 2. Fill 'share_type' and 'share_id' fields in the shared Material/Sort nodes.
 */
static bool
shareinput_mutator_xslice_2(Node *node, PlannerInfo *root, bool fPop)
{
	PlannerGlobal *glob = root->glob;
	ApplyShareInputContext *ctxt = &glob->share;
	Plan	   *plan = (Plan *) node;

	if (fPop)
	{
		if (IsA(plan, Motion))
			shareinput_popmot(ctxt);
		return false;
	}

	if (IsA(plan, Motion))
	{
		Motion	   *motion = (Motion *) plan;

		shareinput_pushmot(ctxt, motion->motionID);
		return true;
	}

	if (IsA(plan, ShareInputScan))
	{
		ShareInputScan *sisc = (ShareInputScan *) plan;
		int			motId = shareinput_peekmot(ctxt);
		ApplyShareInputContextPerShare *pershare;

		pershare = &ctxt->shared_inputs[sisc->share_id];

		if (bms_num_members(pershare->participant_slices) > 1)
		{
			Assert(!sisc->cross_slice);
			sisc->cross_slice = true;
		}

		sisc->producer_slice_id = pershare->producer_slice_id;
		sisc->nconsumers = bms_num_members(pershare->participant_slices) - 1;

		/*
		 * If this share needs to run in the QD, mark the slice accordingly.
		 */
		if (bms_is_member(sisc->share_id, ctxt->qdShares))
		{
			PlanSlice  *currentSlice = &ctxt->slices[motId];

			switch (currentSlice->gangType)
			{
				case GANGTYPE_UNALLOCATED:
				case GANGTYPE_ENTRYDB_READER:
					break;
				case GANGTYPE_SINGLETON_READER:
					currentSlice->gangType = GANGTYPE_ENTRYDB_READER;
					break;
				case GANGTYPE_PRIMARY_READER:
				case GANGTYPE_PRIMARY_WRITER:
					elog(ERROR, "cannot share ShareInputScan between QD and primary reader/write gang");
					break;
			}
		}
	}
	return true;
}

/*
 * Scan through the plan tree and make note of which Share Input Scans
 * are cross-slice.
 */
Plan *
apply_shareinput_xslice(Plan *plan, PlannerInfo *root)
{
	PlannerGlobal *glob = root->glob;
	ApplyShareInputContext *ctxt = &glob->share;
	ListCell   *lp, *lr;
	int			subplan_id;

	/*
	 * If the plan tree has only one slice, there cannot be any cross-slice
	 * Share Input Scans. They were all marked as cross_slice=false when they
	 * were created. Note that we won't set slice_ids on them correctly;
	 * the executor knows not to expect that when numSlices == 1.
	 */
	if (root->glob->numSlices == 1)
		return plan;

	ctxt->motStack = NULL;
	ctxt->qdShares = NULL;
	ctxt->slices = root->glob->slices;

	ctxt->shared_inputs = palloc0(ctxt->shared_input_count * sizeof(ApplyShareInputContextPerShare));

	shareinput_pushmot(ctxt, 0);

	/*
	 * Walk the tree.  See comment for each pass for what each pass will do.
	 * The context is used to carry information from one pass to another, as
	 * well as within a pass.
	 */

	/*
	 * A subplan might have a SharedScan consumer while the SharedScan
	 * producer is in the main plan, or vice versa. So in the first pass, we
	 * walk through all plans and collect all producer subplans into the
	 * context, before processing the consumers.
	 */
	subplan_id = 0;
	forboth(lp, glob->subplans, lr, glob->subroots)
	{
		Plan	   *subplan = (Plan *) lfirst(lp);
		PlannerInfo *subroot =  (PlannerInfo *) lfirst(lr);
		int			slice_id = glob->subplan_sliceIds[subplan_id];

		shareinput_pushmot(ctxt, slice_id);
		shareinput_walker(shareinput_mutator_xslice_1, (Node *) subplan, subroot);
		shareinput_popmot(ctxt);
		subplan_id++;
	}
	shareinput_walker(shareinput_mutator_xslice_1, (Node *) plan, root);

	/* Now walk the tree again, and process all the consumers. */
	subplan_id = 0;
	forboth(lp, glob->subplans, lr, glob->subroots)
	{
		Plan	   *subplan = (Plan *) lfirst(lp);
		PlannerInfo *subroot =  (PlannerInfo *) lfirst(lr);
		int			slice_id = glob->subplan_sliceIds[subplan_id];

		shareinput_pushmot(ctxt, slice_id);
		shareinput_walker(shareinput_mutator_xslice_2, (Node *) subplan, subroot);
		shareinput_popmot(ctxt);
		subplan_id++;
	}
	shareinput_walker(shareinput_mutator_xslice_2, (Node *) plan, root);

	return plan;
}

/*
 * Hash a list of const values with GPDB's hash function
 */
int32
cdbhash_const_list(List *plConsts, int iSegments, Oid *hashfuncs)
{
	ListCell   *lc;
	CdbHash    *pcdbhash;
	int			i;

	Assert(0 < list_length(plConsts));

	pcdbhash = makeCdbHash(iSegments, list_length(plConsts), hashfuncs);

	cdbhashinit(pcdbhash);

	Assert(0 < list_length(plConsts));

	i = 0;
	foreach(lc, plConsts)
	{
		Const	   *pconst = (Const *) lfirst(lc);

		cdbhash(pcdbhash, i + 1, pconst->constvalue, pconst->constisnull);
		i++;
	}

	return cdbhashreduce(pcdbhash);
}

/*
 * Construct an expression that checks whether the current segment is
 * 'segid'.
 */
Node *
makeSegmentFilterExpr(int segid)
{
	/* Build an expression: gp_execution_segment() = <segid> */
	return (Node *)
		make_opclause(Int4EqualOperator,
					  BOOLOID,
					  false,	/* opretset */
					  (Expr *) makeFuncExpr(F_MPP_EXECUTION_SEGMENT,
											INT4OID,
											NIL,	/* args */
											InvalidOid,
											InvalidOid,
											COERCE_EXPLICIT_CALL),
					  (Expr *) makeConst(INT4OID,
										 -1,		/* consttypmod */
										 InvalidOid, /* constcollid */
										 sizeof(int32),
										 Int32GetDatum(segid),
										 false,		/* constisnull */
										 true),		/* constbyval */
					  InvalidOid,	/* opcollid */
					  InvalidOid	/* inputcollid */
			);
}

typedef struct ParamWalkerContext
{
	plan_tree_base_prefix base; /* Required prefix for
								 * plan_tree_walker/mutator */
	Bitmapset  *paramids;		/* Bitmapset for Param */
	Bitmapset  *scanrelids;		/* Bitmapset for scanrelid */
} ParamWalkerContext;

static bool
param_walker(Node *node, ParamWalkerContext *context)
{
	PlannerInfo *root = (PlannerInfo *) context->base.node;
	Param	   *param;
	Aggref	   *aggref;
	Scan	   *scan;
	ListCell   *lc;

	if (node == NULL)
		return false;

	switch (nodeTag(node))
	{
		case T_Param:
			param = (Param *) node;
			if (param->paramkind == PARAM_EXEC)
			{
				if (!bms_is_member(param->paramid, context->paramids))
					context->paramids = bms_add_member(context->paramids,
													   param->paramid);
			}
			return false;

		case T_Aggref:
			/*
			 * See if it's an Aggref that will be replaced by a Param in
			 * set_plan_references()
			 */
			aggref = (Aggref *) node;
			if (root->minmax_aggs != NIL &&
				list_length(aggref->args) == 1)
			{
				TargetEntry *curTarget = (TargetEntry *) linitial(aggref->args);
				ListCell   *lc;

				foreach(lc, root->minmax_aggs)
				{
					MinMaxAggInfo *mminfo = (MinMaxAggInfo *) lfirst(lc);

					if (mminfo->aggfnoid == aggref->aggfnoid &&
						equal(mminfo->target, curTarget->expr))
						param_walker((Node *) mminfo->param, context);
				}
			}
			break;

		case T_ValuesScan:
		case T_FunctionScan:
		case T_TableFunctionScan:
			scan = (Scan *) node;
			if (!bms_is_member(scan->scanrelid, context->scanrelids))
				context->scanrelids = bms_add_member(context->scanrelids,
													 scan->scanrelid);
			break;

		case T_SubPlan:
			{
				PlannerInfo *root = (PlannerInfo *) context->base.node;
				SubPlan	   *spexpr = (SubPlan *) node;
				Plan	   *subplan_plan = planner_subplan_get_plan(root, spexpr);
				PlannerInfo *subplan_root = planner_subplan_get_root(root, spexpr);
				Node	   *save_root;

				if (spexpr->subLinkType == MULTIEXPR_SUBLINK &&
					spexpr->is_initplan)
				{
					foreach (lc, spexpr->setParam)
					{
						int			paramid = lfirst_int(lc);

						if (!bms_is_member(paramid, context->paramids))
							context->paramids = bms_add_member(context->paramids,
															   paramid);
					}
				}

				/* recurse into the subplan */
				save_root = context->base.node;
				context->base.node = (Node *) subplan_root;
				if (param_walker((Node *) subplan_plan, context))
					return true;

				context->base.node = save_root;

				/*
				 * fall through to let plan_tree_walker() handle any expressions in
				 * testexpr and args
				 */
			}
			break;

		default:
			break;
	}

	return plan_tree_walker(node, param_walker, context, false);
}

/*
 * Retrieve param ids that are referenced in RTEs.
 * We can't simply use range_table_walker() here, because we only
 * want to walk through RTEs that are referenced in the plan.
 */
static void
rte_param_walker(List *rtable, ParamWalkerContext *context)
{
	ListCell   *lc;
	int			rteid = 0;
	ListCell   *func_lc;

	foreach(lc, rtable)
	{
		rteid++;
		RangeTblEntry *rte = (RangeTblEntry *) lfirst(lc);

		if (!bms_is_member(rteid, context->scanrelids))
		{
			/* rte not referenced in the plan */
			continue;
		}

		switch (rte->rtekind)
		{
			case RTE_RELATION:
			case RTE_VOID:
			case RTE_CTE:
			case RTE_RESULT:
			case RTE_NAMEDTUPLESTORE:
				/* nothing to do */
				break;
			case RTE_SUBQUERY:
				param_walker((Node *) rte->subquery, context);
				break;
			case RTE_JOIN:
				param_walker((Node *) rte->joinaliasvars, context);
				break;
			case RTE_FUNCTION:
				foreach(func_lc, rte->functions)
				{
					RangeTblFunction *rtfunc = (RangeTblFunction *) lfirst(func_lc);
					param_walker(rtfunc->funcexpr, context);
				}
				break;
			case RTE_TABLEFUNCTION:
				param_walker((Node *) rte->subquery, context);
				foreach(func_lc, rte->functions)
				{
					RangeTblFunction *rtfunc = (RangeTblFunction *) lfirst(func_lc);
					param_walker(rtfunc->funcexpr, context);
				}
				break;
			case RTE_TABLEFUNC:
				param_walker((Node *) rte->tablefunc, context);
				break;
			case RTE_VALUES:
				param_walker((Node *) rte->values_lists, context);
				break;
		}
	}
}

static bool
initplan_walker(Node *node, ParamWalkerContext *context)
{
	PlannerInfo *root;
	List	   *new_initplans = NIL;
	ListCell   *lc,
			   *lp;
	bool		anyused;		/* Are any of this Init Plan's output
								 * parameters actually used? */

	if (node == NULL)
		return false;

	if (is_plan_node(node))
	{
		root = (PlannerInfo *) context->base.node;
		Plan	   *plan = (Plan *) node;

		foreach(lc, plan->initPlan)
		{
			SubPlan    *subplan = (SubPlan *) lfirst(lc);

			Assert(subplan->is_initplan);

			anyused = false;
			foreach(lp, subplan->setParam)
			{
				int			paramid = lfirst_int(lp);

				if (bms_is_member(paramid, context->paramids))
				{
					anyused = true;
					break;
				}
			}

			/* If none of its params are used, leave out from the new list */
			if (anyused)
				new_initplans = lappend(new_initplans, subplan);
			else
			{
				/*
				 * This init plan is unused. Leave it out of this plan node's
				 * initPlan list, and also replace it in the global list of
				 * subplans with a dummy. (We can't just remove it from the
				 * global list, because that would screw up the plan_id
				 * numbering of the subplans).
				 */
				Result	   *dummy = make_result(NIL,
												(Node *) list_make1(makeBoolConst(false, false)),
												NULL);

				planner_subplan_put_plan(root, subplan, (Plan *) dummy);
			}
		}

		/* remove unused params */
		plan->allParam = bms_intersect(plan->allParam, context->paramids);

		list_free(plan->initPlan);
		plan->initPlan = new_initplans;
	}

	return plan_tree_walker(node, initplan_walker, context, true);
}

/*
 * Remove unused initplans from the given plan object
 */
void
remove_unused_initplans(Plan *top_plan, PlannerInfo *root)
{
	ParamWalkerContext context;

	if (!root->glob->subplans)
		return;

	context.base.node = (Node *) root;
	context.paramids = NULL;
	context.scanrelids = NULL;

	/*
	 * Collect param ids of all the Params that are referenced in the plan,and
	 * the IDs of all the range table entries that are referenced in the Plan.
	 */
	param_walker((Node *) top_plan, &context);

	/*
	 * Now that we know which range table entries are referenced in the plan,
	 * also collect Params from those range table entries.
	 */
	rte_param_walker(root->glob->finalrtable, &context);

	/* workhorse to remove unused initplans */
	initplan_walker((Node *) top_plan, &context);

	bms_free(context.paramids);
	bms_free(context.scanrelids);
}

/*
 * Evaluate functions to constants.
 */
Node *
exec_make_plan_constant(struct PlannedStmt *stmt, EState *estate, bool is_SRI,
						List **cursorPositions)
{
	pre_dispatch_function_evaluation_context pcontext;
	Node	   *result;

	Assert(stmt);
	exec_init_plan_tree_base(&pcontext.base, stmt);
	pcontext.single_row_insert = is_SRI;
	pcontext.cursorPositions = NIL;
	pcontext.estate = estate;

	result = pre_dispatch_function_evaluation_mutator((Node *) stmt->planTree, &pcontext);

	*cursorPositions = pcontext.cursorPositions;
	return result;
}

/*
 * Remove subquery field in RTE's with subquery kind.
 */
void
remove_subquery_in_RTEs(Node *node)
{
	if (node == NULL)
	{
		return;
	}

	if (IsA(node, RangeTblEntry))
	{
		RangeTblEntry *rte = (RangeTblEntry *) node;

		if (RTE_SUBQUERY == rte->rtekind && NULL != rte->subquery)
		{
			/*
			 * Replace subquery with a dummy subquery.
			 *
			 * XXX: We could save a lot more memory by deep-freeing the many
			 * fields in the Query too. But I'm not sure which of them might
			 * be shared by other objects in the tree.
			 */
			pfree(rte->subquery);
			rte->subquery = NULL;
		}

		return;
	}

	if (IsA(node, List))
	{
		List	   *list = (List *) node;
		ListCell   *lc = NULL;

		foreach(lc, list)
		{
			remove_subquery_in_RTEs((Node *) lfirst(lc));
		}
	}
}

/*
 * Let's evaluate all STABLE functions that have constant args before
 * dispatch, so we get a consistent view across QEs
 *
 * Also, if this is a single_row insert, let's evaluate nextval() and
 * currval() before dispatching
 */
static Node *
pre_dispatch_function_evaluation_mutator(Node *node,
										 pre_dispatch_function_evaluation_context *context)
{
	Node	   *new_node = 0;

	if (node == NULL)
		return NULL;

	if (IsA(node, Param))
	{
		Param	   *param = (Param *) node;

		/* Not replaceable, so just copy the Param (no need to recurse) */
		return (Node *) copyObject(param);
	}
	else if (IsA(node, FuncExpr))
	{
		FuncExpr   *expr = (FuncExpr *) node;
		List	   *args;
		ListCell   *arg;
		Expr	   *simple;
		FuncExpr   *newexpr;
		bool		has_nonconst_input;

		Form_pg_proc funcform;
		EState	   *estate;
		ExprState  *exprstate;
		MemoryContext oldcontext;
		Datum		const_val;
		bool		const_is_null;
		int16		resultTypLen;
		bool		resultTypByVal;

		Oid			funcid;
		HeapTuple	func_tuple;

		/*
		 * Reduce constants in the FuncExpr's arguments. We know args is
		 * either NIL or a List node, so we can call expression_tree_mutator
		 * directly rather than recursing to self.
		 */
		args = (List *) expression_tree_mutator((Node *) expr->args,
												pre_dispatch_function_evaluation_mutator,
												(void *) context);

		funcid = expr->funcid;

		newexpr = makeNode(FuncExpr);
		newexpr->funcid = expr->funcid;
		newexpr->funcresulttype = expr->funcresulttype;
		newexpr->funcretset = expr->funcretset;
		newexpr->funcvariadic = expr->funcvariadic;
		newexpr->funcformat = expr->funcformat;
		newexpr->funccollid = expr->funccollid;
		newexpr->inputcollid = expr->inputcollid;
		newexpr->args = args;
		newexpr->location = expr->location;
		newexpr->is_tablefunc = expr->is_tablefunc;

		/*
		 * Check for constant inputs
		 */
		has_nonconst_input = false;

		foreach(arg, args)
		{
			if (!IsA(lfirst(arg), Const))
			{
				has_nonconst_input = true;
				break;
			}
		}

		if (!has_nonconst_input)
		{
			bool		is_seq_func = false;
			bool		tup_or_set;

			func_tuple = SearchSysCache1(PROCOID, ObjectIdGetDatum(funcid));
			if (!HeapTupleIsValid(func_tuple))
				elog(ERROR, "cache lookup failed for function %u", funcid);

			funcform = (Form_pg_proc) GETSTRUCT(func_tuple);

			/* can't handle set returning or row returning functions */
			tup_or_set = (funcform->proretset || type_is_rowtype(funcform->prorettype));

			ReleaseSysCache(func_tuple);

			/* can't handle it */
			if (tup_or_set)
			{
				/*
				 * We haven't mutated this node, but we still return the
				 * mutated arguments.
				 *
				 * If we don't do this, we'll miss out on transforming
				 * function arguments which are themselves functions we need
				 * to mutated. For example, select foo(now()).
				 */
				return (Node *) newexpr;
			}

			/*
			 * Here we want to mark any statement that is going to use a
			 * sequence as dirty.  Doing this means that the QD will flush the
			 * xlog which will also flush any xlog writes that the sequence
			 * server might do.
			 */
			if (funcid == F_NEXTVAL_OID || funcid == F_CURRVAL_OID ||
				funcid == F_SETVAL_OID)
			{
				ExecutorMarkTransactionUsesSequences();
				is_seq_func = true;
			}

			if (funcform->provolatile == PROVOLATILE_IMMUTABLE)
				 /* okay */ ;
			else if (funcform->provolatile == PROVOLATILE_STABLE)
				 /* okay */ ;
			else if (context->single_row_insert && is_seq_func)
				;				/* Volatile, but special sequence function */
			else
				return (Node *) newexpr;

			/*
			 * Ok, we have a function that is STABLE (or IMMUTABLE), with
			 * constant args. Let's try to evaluate it.
			 */

			/*
			 * To use the executor, we need an EState.
			 */
			estate = CreateExecutorState();

			/* We can use the estate's working context to avoid memory leaks. */
			oldcontext = MemoryContextSwitchTo(estate->es_query_cxt);

			/*
			 * Prepare expr for execution.
			 */
			exprstate = ExecPrepareExpr((Expr *) newexpr, estate);

			/*
			 * And evaluate it.
			 *
			 * It is OK to use a default econtext because none of the
			 * ExecEvalExpr() code used in this situation will use econtext.
			 * That might seem fortuitous, but it's not so unreasonable --- a
			 * constant expression does not depend on context, by definition,
			 * n'est-ce pas?
			 */
			const_val = ExecEvalExprSwitchContext(exprstate,
												  GetPerTupleExprContext(estate),
												  &const_is_null);

			/* Get info needed about result datatype */
			get_typlenbyval(expr->funcresulttype, &resultTypLen, &resultTypByVal);

			/* Get back to outer memory context */
			MemoryContextSwitchTo(oldcontext);

			/* Must copy result out of sub-context used by expression eval */
			if (!const_is_null)
				const_val = datumCopy(const_val, resultTypByVal, resultTypLen);

			/* Release all the junk we just created */
			FreeExecutorState(estate);

			/*
			 * Make the constant result node.
			 */
			simple = (Expr *) makeConst(expr->funcresulttype,
										-1,
										expr->funccollid,
										resultTypLen,
										const_val, const_is_null,
										resultTypByVal);

			/* successfully simplified it */
			if (simple)
				return (Node *) simple;
		}

		/*
		 * The expression cannot be simplified any further, so build and
		 * return a replacement FuncExpr node using the possibly-simplified
		 * arguments.
		 */
		return (Node *) newexpr;
	}
	else if (IsA(node, OpExpr))
	{
		OpExpr	   *expr = (OpExpr *) node;
		List	   *args;

		OpExpr	   *newexpr;

		/*
		 * Reduce constants in the OpExpr's arguments.  We know args is either
		 * NIL or a List node, so we can call expression_tree_mutator directly
		 * rather than recursing to self.
		 */
		args = (List *) expression_tree_mutator((Node *) expr->args,
												pre_dispatch_function_evaluation_mutator,
												(void *) context);

		/*
		 * Need to get OID of underlying function.	Okay to scribble on input
		 * to this extent.
		 */
		set_opfuncid(expr);

		newexpr = makeNode(OpExpr);
		newexpr->opno = expr->opno;
		newexpr->opfuncid = expr->opfuncid;
		newexpr->opresulttype = expr->opresulttype;
		newexpr->opretset = expr->opretset;
		newexpr->opcollid = expr->opcollid;
		newexpr->inputcollid = expr->inputcollid;
		newexpr->args = args;
		newexpr->location = expr->location;

		return (Node *) newexpr;
	}
	else if (IsA(node, CurrentOfExpr))
	{
		/*
		 * updatable cursors
		 *
		 * During constant folding, we collect the current position of each
		 * cursor mentioned in the plan into a list, and dispatch them to the
		 * QEs.
		 *
		 * We should not get here if called from planner_make_plan_constant().
		 * That is only used for simple Result plans, which should not contain
		 * CURRENT OF expressions.
		 */
		if (context->estate)
		{
			CurrentOfExpr *expr = (CurrentOfExpr *) node;
			CursorPosInfo *cpos;

			cpos = makeNode(CursorPosInfo);

			getCurrentOf(expr,
						 GetPerTupleExprContext(context->estate),
						 expr->target_relid,
						 &cpos->ctid,
						 &cpos->gp_segment_id,
						 &cpos->table_oid,
						 &cpos->cursor_name);

			context->cursorPositions = lappend(context->cursorPositions, cpos);
		}
	}

	/*
	 * For any node type not handled above, we recurse using
	 * plan_tree_mutator, which will copy the node unchanged but try to
	 * simplify its arguments (if any) using this routine.
	 */
	new_node = plan_tree_mutator(node,
								 pre_dispatch_function_evaluation_mutator,
								 (void *) context,
								 true);

	return new_node;
}

/*
 * cdbpathtoplan_create_sri_path
 *
 * Optimize for single-row-insertion for const result. If result is const, and
 * result relation is partitioned, we could decide partition relation during
 * plan time and replace targetPolicy by partition relation's targetPolicy.
 * In addition, we don't need tuple distribution, but do filter on each writer
 * segment.
 *
 * Inputs:
 *
 * root		PlannerInfo passed by caller
 * plan		should always be result node
 * rte		is the target relation entry
 *
 * Inputs/Outputs:
 *
 * targetPolicy(in/out) is the target relation policy, and would be replaced
 * by partition relation.
 * hashExpr is distribution expression of target relation, and would be
 * replaced by partition relation.
 */
Plan *
cdbpathtoplan_create_sri_plan(RangeTblEntry *rte, PlannerInfo *subroot, Path *subpath,
							  int createplan_flags)
{
	CdbMotionPath *motionpath;
	Path	   *resultpath;
	Result	   *resultplan;
	Relation	rel;
	GpPolicy   *targetPolicy;
	List	   *hashExprs;
	List	   *hashOpfamilies;
	int			numHashAttrs;
	AttrNumber *hashAttrs;
	Oid		   *hashFuncs;
	int			i;
	ListCell   *cell;

	if (!gp_enable_fast_sri)
		return NULL;

	if (!IsA(subpath, CdbMotionPath))
		return NULL;
	motionpath = (CdbMotionPath *) subpath;

	if (IsA(motionpath->subpath, GroupResultPath))
	{
		/* ok */
	}
	else if (IsA(motionpath->subpath, ProjectionPath) &&
			 IsA(((ProjectionPath *) motionpath->subpath)->subpath, GroupResultPath))
	{
		/* ProjectionPath with a GroupResultPath beneath is also ok. */
	}
	else
		return NULL;

	resultpath = motionpath->subpath;

	if (contain_mutable_functions((Node *) resultpath->pathtarget->exprs))
		return NULL;

	resultplan = (Result *) create_plan_recurse(subroot, resultpath, createplan_flags);
	if (!IsA(resultplan, Result))
	{
		/* A GroupResultPath really should produce a Result node. */
		Assert(false);
		return NULL;
	}

	/* Suppose caller already hold proper locks for relation. */
	rel = relation_open(rte->relid, NoLock);
	targetPolicy = rel->rd_cdbpolicy;
	hashExprs = getExprListFromTargetList(resultplan->plan.targetlist,
										  targetPolicy->nattrs,
										  targetPolicy->attrs);
	hashOpfamilies = NIL;
	for (int i = 0; i < targetPolicy->nattrs; i++)
	{
		Oid			opfamily = get_opclass_family(targetPolicy->opclasses[i]);

		hashOpfamilies = lappend_oid(hashOpfamilies, opfamily);
	}

	/*
	 * If there is no distribution key, don't do direct dispatch.
	 *
	 * GPDB_90_MERGE_FIXME: Is that the right thing to do? Couldn't we
	 * direct dispatch to any arbitrarily chosen segment, in that case?
	 */
	numHashAttrs = targetPolicy->nattrs;

	if (numHashAttrs > 0)
	{
		/* Get hash functions for the columns. */
		hashFuncs = palloc(numHashAttrs * sizeof(Oid));
		i = 0;
		foreach(cell, hashExprs)
		{
			Expr	   *elem = (Expr *) lfirst(cell);
			Oid			att_type = exprType((Node *) elem);
			Oid			opclass = targetPolicy->opclasses[i];

			hashFuncs[i++] =
				cdb_hashproc_in_opfamily(get_opclass_family(opclass),
										 att_type);
		}

		/*
		 * all constants in values clause -- no need to repartition.
		 */

		/* copy the attributes array */
		hashAttrs = palloc(numHashAttrs * sizeof(AttrNumber));
		for (i = 0; i < numHashAttrs; i++)
			hashAttrs[i] = targetPolicy->attrs[i];

		if (subroot->config->gp_enable_direct_dispatch)
		{
			DirectDispatchUpdateContentIdsForInsert(subroot,
													&resultplan->plan,
													targetPolicy,
													hashFuncs);

			/*
			 * we now either have a hash-code, or we've marked the plan
			 * non-directed.
			 */
		}

		resultplan->numHashFilterCols = numHashAttrs;
		resultplan->hashFilterColIdx = hashAttrs;
		resultplan->hashFilterFuncs = hashFuncs;
	}
	else
		resultplan = NULL;

	relation_close(rel, NoLock);

	return (Plan *) resultplan;
}

/*
 * Does the given expression contain Params that are passed down from
 * outer query?
 */
bool
contains_outer_params(Node *node, void *context)
{
	PlannerInfo *root = (PlannerInfo *) context;

	if (node == NULL)
		return false;
	if (IsA(node, Param))
	{
		Param	   *param = (Param *) node;

		if (param->paramkind == PARAM_EXEC)
		{
			/* Does this Param refer to a value that an outer query provides? */
			PlannerInfo *parent = root->parent_root;

			while (parent)
			{
				ListCell   *lc;

				foreach (lc, parent->plan_params)
				{
					PlannerParamItem *ppi = (PlannerParamItem *) lfirst(lc);

					if (ppi->paramId == param->paramid)
						return true;		/* abort the tree traversal and return true */
				}

				parent = parent->parent_root;
			}
		}
	}
	return expression_tree_walker(node, contains_outer_params, context);
}

相关信息

greenplumn 源码目录

相关文章

greenplumn cdbappendonlystorageformat 源码

greenplumn cdbappendonlystorageread 源码

greenplumn cdbappendonlystoragewrite 源码

greenplumn cdbappendonlyxlog 源码

greenplumn cdbbufferedappend 源码

greenplumn cdbbufferedread 源码

greenplumn cdbcat 源码

greenplumn cdbcopy 源码

greenplumn cdbdistributedsnapshot 源码

greenplumn cdbdistributedxacts 源码

0  赞