greenplumn cdbllize 源码
greenplumn cdbllize 代码
文件路径:/src/backend/cdb/cdbllize.c
/*-------------------------------------------------------------------------
*
* cdbllize.c
* Parallelize a PostgreSQL sequential plan tree.
*
* Portions Copyright (c) 2005-2008, Greenplum inc
* Portions Copyright (c) 2012-Present VMware, Inc. or its affiliates.
*
* This file contains functions to process plan tree, at various stages in
* planning, to produce a parallelize MPP plan. Outline of the stages,
* as they happen in standard_planner():
*
* 1. Call subquery_planner(), to build the best Path.
*
* 2. Call cdbllize_adjust_top_path().
*
* cdbllize_adjust_top_path() adds Motion paths on top of the best Path, so
* that the query's final result is made available at the correct nodes.
* That's usually the QD node, so that the QD can send the query result
* back to the client, but in a DML statement or CREATE TABLE AS, the final
* result might be needed in the segments instead.
*
* (There's a similar function for Init Plans,
* cdbllize_adjust_init_plan_path(). The planner calls it when it builds
* Paths for SubPlans that are run as Init Plans before the main query.)
*
* 3. Call create_plan() to turn the best Path into a Plan tree.
*
* 4. Call cdbllize_decorate_subplans_with_motions()
*
* cdbllize_decorate_subplans_with_motions() adds Motions to subplans as
* needed, to make subplan results available in the parent nodes.
*
* 5. Call set_plan_references()
*
* 6. Call cdbllize_build_slice_table().
*
* cdbllize_build_slice_table() assigns slice IDs to Motion nodes, and
* builds the slice table needed by the executor to set up the gang of
* processes and the interconnects between them. It also eliminates unused
* subplans, and zaps some fields from the Query tree that are not needed
* after planning, to reduce the amount of data that needs to be dispatched.
*
*
* In addition to the above-mentioned functions, there's a
* motion_sanity_check() function, which is used to check that the query
* plan doesn't contain deadlock hazards involving Motions across nodes.
*
* IDENTIFICATION
* src/backend/cdb/cdbllize.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "catalog/pg_am.h"
#include "commands/defrem.h"
#include "optimizer/clauses.h"
#include "optimizer/pathnode.h"
#include "nodes/primnodes.h"
#include "nodes/parsenodes.h"
#include "nodes/plannodes.h"
#include "nodes/print.h"
#include "optimizer/paths.h"
#include "optimizer/planmain.h" /* for make_result() */
#include "parser/parsetree.h" /* for rt_fetch() */
#include "nodes/makefuncs.h" /* for makeTargetEntry() */
#include "utils/guc.h" /* for Debug_pretty_print */
#include "utils/lsyscache.h"
#include "cdb/cdbhash.h"
#include "cdb/cdbvars.h"
#include "cdb/cdbpath.h"
#include "cdb/cdbplan.h"
#include "cdb/cdbpullup.h"
#include "cdb/cdbllize.h"
#include "cdb/cdbmutate.h"
#include "cdb/cdbutil.h"
#include "optimizer/tlist.h"
#include "optimizer/optimizer.h" /* for contain_volatile_functions() */
/*
* decorate_subplans_with_motions_context holds state for the recursive
* cdbllize_decorate_subplans_with_motions() function.
*/
typedef struct decorate_subplan_info
{
int sliceDepth;
Flow *parentFlow;
bool is_initplan;
bool useHashTable;
bool processed;
} decorate_subplan_info;
typedef struct decorate_subplans_with_motions_context
{
plan_tree_base_prefix base; /* Required prefix for
* plan_tree_walker/mutator */
/* Queue of subplan IDs that we seen, but processed yet. */
List *subplan_workingQueue;
/*
* array, one element for each subplan. Use subplans[plan_id - 1] to
* access the struct for a particular subplan.
*/
decorate_subplan_info *subplans;
/* Current position in the tree. */
int sliceDepth;
Flow *currentPlanFlow;
} decorate_subplans_with_motions_context;
/* State for the recursive build_slice_table() function. */
typedef struct
{
plan_tree_base_prefix base; /* Required prefix for
* plan_tree_walker/mutator */
/*
* PlanSlice structs. Indexed by slice Id. This List is converted
* into an array, for PlannedStmt->slices, at the end of
* build_slice_table().
*/
List *slices;
/* Current slice in the traversal. */
int currentSliceIndex;
/*
* Subplan IDs that we have seen. Used to prevent scanning the
* same subplan more than once, even if there are multiple SubPlan
* nodes referring it, and to eliminate unused subplans.
*/
Bitmapset *seen_subplans;
} build_slice_table_context;
/*
* Forward Declarations
*/
static Node *fix_outer_query_motions_mutator(Node *node, decorate_subplans_with_motions_context *context);
static Plan *fix_subplan_motion(PlannerInfo *root, Plan *subplan, Flow *outer_query_flow);
static bool build_slice_table_walker(Node *node, build_slice_table_context *context);
static void adjust_top_path_for_parallel_retrieve_cursor(Path *path, PlanSlice *slice);
/*
* Create a GpPolicy that matches the natural distribution of the given plan.
*
* This is used with CREATE TABLE AS, to derive the distribution
* key for the table from the query plan.
*/
static GpPolicy *
get_partitioned_policy_from_path(PlannerInfo *root, Path *path)
{
/* Find out what the flow is partitioned on */
List *policykeys;
List *policyopclasses;
ListCell *dk_cell;
ListCell *ec_cell;
ListCell *em_cell;
/*
* Is it a Hashed distribution?
*
* NOTE: HashedOJ is not OK, because we cannot let the NULLs be stored
* multiple segments.
*/
if (path->locus.locustype != CdbLocusType_Hashed)
{
return NULL;
}
policykeys = NIL;
policyopclasses = NIL;
foreach(dk_cell, path->locus.distkey)
{
DistributionKey *dk = lfirst(dk_cell);
bool found_expr = false;
foreach(ec_cell, dk->dk_eclasses)
{
EquivalenceClass *ec = lfirst(ec_cell);
while (ec->ec_merged)
ec = ec->ec_merged;
foreach(em_cell, ec->ec_members)
{
EquivalenceMember *em = lfirst(em_cell);
Expr *var1 = (Expr *) em->em_expr;
AttrNumber attno;
ListCell *tle_cell;
/*
* Right side variable may be encapsulated by a relabel node.
* Motion, however, does not care about relabel nodes.
*/
if (IsA(var1, RelabelType))
var1 = ((RelabelType *) var1)->arg;
/* See if this Expr is a column of the result table */
attno = 0;
foreach(tle_cell, root->processed_tlist)
{
TargetEntry *target = lfirst(tle_cell);
attno++;
if (target->resjunk)
continue;
if (equal(var1, target->expr))
{
/*
* If it is, use it to partition the result table, to avoid
* unnecessary redistribution of data
*/
Oid opclass;
Oid eqop;
Oid typeid;
ListCell *opfam_cell;
Assert(list_length(policykeys) < MaxPolicyAttributeNumber);
if (list_member_int(policykeys, attno))
ereport(ERROR,
(errcode(ERRCODE_DUPLICATE_COLUMN),
errmsg("duplicate DISTRIBUTED BY column '%s'",
target->resname ? target->resname : "???")));
/*
* We know the btree operator family corresponding to
* the distribution, but we don't know the exact
* hash operator class that corresponds to that. In
* the common case, the datatype has exactly one
* default operator class, and you usually use only
* that. So look up the default operator class for the
* datatype, and if it's compatible with the btree
* operator family, use that.
*
* If that fails, we could do some further checks. We
* could check if there is some other operator class
* for the datatype, and if so, use that. But it
* doesn't seem worth adding much extra code to deal
* with more obscure cases. Deriving the distribution
* key from the query plan is a heuristic, anyway.
*/
typeid = exprType((Node *) target->expr);
opclass = GetDefaultOpClass(typeid, HASH_AM_OID);
eqop = cdb_eqop_in_hash_opfamily(get_opclass_family(opclass), typeid);
foreach(opfam_cell, ec->ec_opfamilies)
{
Oid btopfamily = lfirst_oid(opfam_cell);
if (get_op_opfamily_strategy(eqop, btopfamily))
{
policykeys = lappend_int(policykeys, attno);
policyopclasses = lappend_oid(policyopclasses, opclass);
found_expr = true;
break;
}
}
}
if (found_expr)
break;
}
if (found_expr)
break;
}
if (found_expr)
break;
}
if (!found_expr)
{
/*
* This distribution key is not present in the target list. Give
* up.
*/
return NULL;
}
}
/*
* We use the default number of segments, even if the flow was partially
* distributed. That defeats the performance benefit of using the same
* distribution key columns, because we'll need a Restribute Motion
* anyway. But presumably if the user had expanded the cluster, they want
* to use all the segments for new tables.
*/
return createHashPartitionedPolicy(policykeys,
policyopclasses,
GP_POLICY_DEFAULT_NUMSEGMENTS());
}
/*
* Get a locus that represents the desired distribution of the query result.
*
* This is a stripped down version of the logic in cdbllize_adjut_top_path(),
* used to hint the planner on where the result is needed. Sometimes, the
* final distribution is not representable as a locus, a Null locus
* returned in that case.
*
* TODO: This only handles a few cases. For example, INSERT INTO SELECT ...
* is not handled, because the parser injects a subquery for ti which makes
* it tricky.
*/
CdbPathLocus
cdbllize_get_final_locus(PlannerInfo *root, PathTarget *target)
{
CdbPathLocus nullLocus;
Query *query = root->parse;
if (query->commandType == CMD_SELECT &&
(query->parentStmtType == PARENTSTMTTYPE_CTAS ||
query->parentStmtType == PARENTSTMTTYPE_REFRESH_MATVIEW))
{
/* CREATE TABLE AS or SELECT INTO or REFERSH MATERIALIZED VIEW */
GpPolicy *intoPolicy = query->intoPolicy;
if (intoPolicy != NULL)
{
Assert(intoPolicy->ptype != POLICYTYPE_ENTRY);
Assert(intoPolicy->nattrs >= 0);
Assert(intoPolicy->nattrs <= MaxPolicyAttributeNumber);
if (intoPolicy->ptype == POLICYTYPE_PARTITIONED &&
intoPolicy->nattrs > 0)
{
return cdbpathlocus_for_insert(root, intoPolicy, target);
}
else if (intoPolicy->ptype == POLICYTYPE_REPLICATED)
{
CdbPathLocus locus;
CdbPathLocus_MakeReplicated(&locus, intoPolicy->numsegments);
return locus;
}
}
}
else if (query->commandType == CMD_SELECT && query->parentStmtType == PARENTSTMTTYPE_NONE)
{
/*
* Query result needs to be brought back to the QD.
*
* NOTE: we don't do this for RETURNING list, like cdbllize_adjust_top_path()
* does below. The locus we construct here is for the plan result before
* evaluating possible RETURNING clauses.
*/
CdbPathLocus entryLocus;
CdbPathLocus_MakeEntry(&entryLocus);
return entryLocus;
}
CdbPathLocus_MakeNull(&nullLocus);
return nullLocus;
}
/*
* cdbllize_adjust_top_path -- Adjust top Path for MPP
*
* Add a Motion to the top of the query path, so that the final result
* is distributed correctly for the kind of query. For example, an INSERT
* statement's result must be sent to the correct segments where the data
* needs to be inserted, and a SELECT query's result must be brought to the
* dispatcher, so that it can be sent to the client.
*
* For a CREATE TABLE AS command, this also determines the distribution
* policy for the table, if it was not given explicitly with DISTRIBUTED BY.
*
* The input is a Path, produced by subquery_planner(). The result is a
* Path, with a Motion added on top, if needed. Also, *topslice is adjusted
* to reflect how/where the top slice needs to be executed.
*
* NB: keep cdbllize_get_final_locus() up to date with any changes here!
*/
Path *
cdbllize_adjust_top_path(PlannerInfo *root, Path *best_path,
PlanSlice *topslice)
{
Query *query = root->parse;
GpPolicy *targetPolicy = NULL;
/*
* NOTE: This code makes the assumption that if we are working on a
* hierarchy of tables, all the tables are distributed, or all are on the
* entry DB. Any mixture will fail.
*/
if (query->resultRelation > 0)
{
RangeTblEntry *rte = rt_fetch(query->resultRelation, query->rtable);
Assert(rte->rtekind == RTE_RELATION);
targetPolicy = GpPolicyFetch(rte->relid);
}
if (query->commandType == CMD_SELECT && query->parentStmtType == PARENTSTMTTYPE_CTAS)
{
/* CREATE TABLE AS or SELECT INTO */
if (query->intoPolicy != NULL)
{
targetPolicy = query->intoPolicy;
Assert(query->intoPolicy->ptype != POLICYTYPE_ENTRY);
Assert(query->intoPolicy->nattrs >= 0);
Assert(query->intoPolicy->nattrs <= MaxPolicyAttributeNumber);
}
else if (gp_create_table_random_default_distribution)
{
targetPolicy = createRandomPartitionedPolicy(GP_POLICY_DEFAULT_NUMSEGMENTS());
ereport(NOTICE,
(errcode(ERRCODE_SUCCESSFUL_COMPLETION),
errmsg("using default RANDOM distribution since no distribution was specified"),
errhint("Consider including the 'DISTRIBUTED BY' clause to determine the distribution of rows.")));
}
else
{
/* First try to deduce the distribution from the query */
targetPolicy = get_partitioned_policy_from_path(root, best_path);
/*
* If that fails, hash on the first hashable column we can
* find.
*/
if (!targetPolicy)
{
int i;
List *policykeys = NIL;
List *policyopclasses = NIL;
ListCell *lc;
i = 0;
foreach(lc, best_path->pathtarget->exprs)
{
Oid typeOid = exprType((Node *) lfirst(lc));
Oid opclass = InvalidOid;
/*
* Check for a legacy hash operator class if
* gp_use_legacy_hashops GUC is set. If
* InvalidOid is returned or the GUC is not
* set, we'll get the default operator class.
*/
if (gp_use_legacy_hashops)
opclass = get_legacy_cdbhash_opclass_for_base_type(typeOid);
if (!OidIsValid(opclass))
opclass = cdb_default_distribution_opclass_for_type(typeOid);
if (OidIsValid(opclass))
{
policykeys = lappend_int(policykeys, i + 1);
policyopclasses = lappend_oid(policyopclasses, opclass);
break;
}
i++;
}
targetPolicy = createHashPartitionedPolicy(policykeys,
policyopclasses,
GP_POLICY_DEFAULT_NUMSEGMENTS());
}
/* If we deduced the policy from the query, give a NOTICE */
if (query->parentStmtType == PARENTSTMTTYPE_CTAS)
{
StringInfoData columnsbuf;
int i;
initStringInfo(&columnsbuf);
for (i = 0; i < targetPolicy->nattrs; i++)
{
TargetEntry *target = get_tle_by_resno(root->processed_tlist, targetPolicy->attrs[i]);
if (i > 0)
appendStringInfoString(&columnsbuf, ", ");
if (target->resname)
appendStringInfoString(&columnsbuf, target->resname);
else
appendStringInfoString(&columnsbuf, "???");
}
ereport(NOTICE,
(errcode(ERRCODE_SUCCESSFUL_COMPLETION),
errmsg("Table doesn't have 'DISTRIBUTED BY' clause -- Using column(s) "
"named '%s' as the Greenplum Database data distribution key for this "
"table. ", columnsbuf.data),
errhint("The 'DISTRIBUTED BY' clause determines the distribution of data."
" Make sure column(s) chosen are the optimal data distribution key to minimize skew.")));
}
}
Assert(targetPolicy->ptype != POLICYTYPE_ENTRY);
query->intoPolicy = targetPolicy;
if (GpPolicyIsReplicated(targetPolicy) &&
!contain_volatile_functions((Node*) query->targetList) &&
!contain_volatile_functions((Node*) query->havingQual))
{
CdbPathLocus replicatedLocus;
CdbPathLocus_MakeReplicated(&replicatedLocus,
targetPolicy->numsegments);
best_path = cdbpath_create_motion_path(root,
best_path,
NIL,
false,
replicatedLocus);
}
else
{
/*
* Make sure the top level flow is partitioned on the
* partitioning key of the target relation. Since this is
* a SELECT INTO (basically same as an INSERT) command,
* the target list will correspond to the attributes of
* the target relation in order.
*/
best_path = create_motion_path_for_ctas(root, targetPolicy,
best_path);
}
topslice->numsegments = targetPolicy->numsegments;
topslice->segindex = 0;
topslice->gangType = (targetPolicy->ptype == POLICYTYPE_ENTRY) ? GANGTYPE_UNALLOCATED : GANGTYPE_PRIMARY_WRITER;
}
else if (query->commandType == CMD_SELECT && query->parentStmtType == PARENTSTMTTYPE_REFRESH_MATVIEW)
{
/*
* REFRESH MATERIALIZED VIEW
*
* This is the same as the logic for CREATE TABLE AS with an explicit
* DISTRIBUTED BY above.
*/
targetPolicy = query->intoPolicy;
if (!targetPolicy)
elog(ERROR, "materialized view has no distribution policy");
if (targetPolicy->ptype == POLICYTYPE_ENTRY)
elog(ERROR, "materialized view with entry distribution policy not supported");
if (GpPolicyIsReplicated(targetPolicy))
{
CdbPathLocus broadcastLocus;
CdbPathLocus_MakeSegmentGeneral(&broadcastLocus,
targetPolicy->numsegments);
best_path = cdbpath_create_motion_path(root,
best_path,
NIL,
false,
broadcastLocus);
}
else
{
/*
* Make sure the top level flow is partitioned on the
* partitioning key of the target relation. Since this is
* a SELECT INTO (basically same as an INSERT) command,
* the target list will correspond to the attributes of
* the target relation in order.
*/
best_path = create_motion_path_for_insert(root, targetPolicy,
best_path);
}
topslice->numsegments = targetPolicy->numsegments;
topslice->segindex = 0;
topslice->gangType = (targetPolicy->ptype == POLICYTYPE_ENTRY) ? GANGTYPE_UNALLOCATED : GANGTYPE_PRIMARY_WRITER;
}
else if (query->commandType == CMD_SELECT && query->parentStmtType == PARENTSTMTTYPE_COPY)
{
/* COPY (SELECT ...) TO */
topslice->directDispatch.isDirectDispatch = false;
topslice->directDispatch.haveProcessedAnyCalculations = true;
topslice->numsegments = getgpsegmentCount();
topslice->segindex = 0;
topslice->gangType = GANGTYPE_PRIMARY_READER;
}
else if (query->commandType == CMD_SELECT ||
query->commandType == CMD_INSERT ||
query->commandType == CMD_UPDATE ||
query->commandType == CMD_DELETE)
{
Assert(query->parentStmtType == PARENTSTMTTYPE_NONE);
if (query->commandType == CMD_SELECT || query->returningList)
{
if (!root->glob->is_parallel_cursor)
{
/*
* Query result needs to be brought back to the QD. Ask for
* a motion to bring it in. If the result already has the
* right locus, cdbpath_create_motion_path() will return it
* unmodified.
*
* If the query has an ORDER BY clause, use Merge Receive to
* preserve the ordering. The plan has already been set up to
* ensure each qExec's result is properly ordered according to
* the ORDER BY specification.
*/
CdbPathLocus entryLocus;
CdbPathLocus_MakeEntry(&entryLocus);
/*
* In a Motion to Entry locus, the numsegments indicates the
* number of segments in the *sender*.
*/
entryLocus.numsegments = best_path->locus.numsegments;
best_path = cdbpath_create_motion_path(root,
best_path,
root->sort_pathkeys,
false,
entryLocus);
topslice->numsegments = 1;
topslice->segindex = 0;
topslice->gangType = GANGTYPE_UNALLOCATED;
}
else
adjust_top_path_for_parallel_retrieve_cursor(best_path, topslice);
}
else
{
topslice->numsegments = targetPolicy->numsegments;
topslice->segindex = 0;
topslice->gangType = (targetPolicy->ptype == POLICYTYPE_ENTRY) ? GANGTYPE_UNALLOCATED : GANGTYPE_PRIMARY_WRITER;
}
}
else if (query->commandType == CMD_UTILITY)
{
/* nothing to do */
}
else
elog(ERROR, "unknown command type %d", query->commandType);
return best_path;
}
/*
* cdbllize_adjust_init_plan_path -- Adjust Init Plan's Path for MPP
*
* Init Plans are dispatched and executed before the main query. Adjust the
* Path to bring the result back to the QD.
*/
Path *
cdbllize_adjust_init_plan_path(PlannerInfo *root, Path *best_path)
{
CdbPathLocus entryLocus;
CdbPathLocus_MakeEntry(&entryLocus);
/*
* In a Motion to Entry locus, the numsegments indicates the
* number of segments in the *sender*.
*/
entryLocus.numsegments = best_path->locus.numsegments;
return cdbpath_create_motion_path(root,
best_path,
root->sort_pathkeys,
false,
entryLocus);
}
/*
* cdbllize_decorate_subplans_with_motions - Adjust subplans for MPP.
*
* This function adds Motion nodes on top of plan trees for SubPlans, so that
* the subplan results are available at the correct segment for the outer
* query. It also fixes up any MOTIONTYPE_OUTER_QUERY Motions, changing them
* into Gather or Broadcast Motions, depending on the parent slice. The plan
* tree can contain outer-query Motions, even if there are no SubPlans, if it
* contains subquery RTEs.
*
* The input is a Plan tree, from create_plan().
*
* Outline of processing:
*
* - Walk through the main Plan tree. Make note of all SubPlan expressions,
* and the context they appear in.
*
* - Recurse to process each subplan that was encountered. At the top of the
* subplan, add a Motion if needed, so that the result of the subplan is
* available in the outer slice. For example, if the SubPlan appears in
* a slice that's executed only on a single QE node, then the subplan's
* result must also be present at that node. A Gather Motion is put on
* top of the subplan, if needed. Or if the parent slice is executes on all
* nodes (Replicated or Partitioned), then a Broadcast Motion is put on
* top of the subplan, unless the subplan's result is already available on
* all QEs.
*
* - If any of the subplans contained more SubPlan expressions, process those
* too.
*
* The result is a deep copy of the argument Plan tree with added/modified
* Motion nodes, or the original Plan tree unmodified if no changes are
* needed.
*
* Note: Subqueries in from-list can contain MOTIONTYPE_OUTER_QUERY Motions,
* too, so we need to scan the tree even if there are no SubPlans.
*/
Plan *
cdbllize_decorate_subplans_with_motions(PlannerInfo *root, Plan *plan)
{
Plan *result;
decorate_subplans_with_motions_context context;
int nsubplans;
/* Initialize mutator context. */
planner_init_plan_tree_base(&context.base, root);
context.sliceDepth = 0;
context.subplan_workingQueue = NIL;
nsubplans = list_length(root->glob->subplans);
context.subplans = (decorate_subplan_info *)
palloc((nsubplans + 1) * sizeof(decorate_subplan_info));
for (int i = 1; i <= nsubplans; i++)
{
decorate_subplan_info *sstate = &context.subplans[i];
sstate->sliceDepth = -1;
sstate->parentFlow = NULL;
sstate->is_initplan = false;
sstate->processed = false;
}
/*
* Process the main plan tree.
*/
context.currentPlanFlow = plan->flow;
result = (Plan *) fix_outer_query_motions_mutator((Node *) plan, &context);
/*
* Process any SubPlans that were reachable from the main plan tree.
* While we process the SubPlans, we might encounter more SubPlans.
* They will be added to the working queue, so keep going until the
* working queue is empty.
*/
while (context.subplan_workingQueue)
{
int plan_id = linitial_int(context.subplan_workingQueue);
decorate_subplan_info *sstate = &context.subplans[plan_id];
ListCell *planlist_cell = list_nth_cell(root->glob->subplans, plan_id - 1);
Plan *subplan = (Plan *) lfirst(planlist_cell);
context.subplan_workingQueue = list_delete_first(context.subplan_workingQueue);
if (sstate->processed)
{
if (!sstate->is_initplan)
elog(ERROR, "SubPlan %d already processed", plan_id);
else
continue;
}
sstate->processed = true;
if (sstate->is_initplan)
{
/*
* InitPlans are dispatched separately, before the main plan,
* and the result is brought to the QD.
*/
Flow *topFlow = makeFlow(FLOW_SINGLETON, getgpsegmentCount());
topFlow->segindex = -1;
context.sliceDepth = 0;
context.currentPlanFlow = topFlow;
}
else
{
context.sliceDepth = sstate->sliceDepth;
context.currentPlanFlow = sstate->parentFlow;
}
if (!subplan->flow)
elog(ERROR, "subplan is missing Flow information");
/*
* If the subquery result is not available where the outer query needs it,
* we have to add a Motion node to redistribute it.
*/
if (subplan->flow->locustype != CdbLocusType_OuterQuery &&
subplan->flow->locustype != CdbLocusType_SegmentGeneral &&
subplan->flow->locustype != CdbLocusType_General)
{
subplan = fix_subplan_motion(root, subplan, context.currentPlanFlow);
/*
* If we created a Motion, protect it from rescanning. Init Plans
* and hashed SubPlans are never rescanned.
*/
if (IsA(subplan, Motion) && !sstate->is_initplan &&
!sstate->useHashTable)
subplan = (Plan *) make_material(subplan);
}
subplan = (Plan *) fix_outer_query_motions_mutator((Node *) subplan, &context);
lfirst(planlist_cell) = subplan;
}
return result;
}
/*
* Workhorse for cdbllize_fix_outer_query_motions().
*/
static Node *
fix_outer_query_motions_mutator(Node *node, decorate_subplans_with_motions_context *context)
{
Node *newnode;
Plan *plan;
Flow *saveCurrentPlanFlow;
#ifdef USE_ASSERT_CHECKING
PlannerInfo *root = (PlannerInfo *) context->base.node;
Assert(root && IsA(root, PlannerInfo));
#endif
if (node == NULL)
return NULL;
/* An expression node might have subtrees containing plans to be mutated. */
if (!is_plan_node(node))
{
node = plan_tree_mutator(node, fix_outer_query_motions_mutator, context, false);
/*
* If we see a SubPlan, remember the context where we saw it. We memorize
* the parent's node's Flow, so that we know where the SubPlan needs to
* bring the result. There might be multiple references to the same SubPlan,
* if e.g. the result of the SubPlan is passed up the plan for use in later
* joins or for the final target list. Remember the *deepest* slice level
* where we saw it. We cannot pass SubPlan's result down the tree, but we
* can propagate it upwards if we put it to the target list. So it needs
* to be calculated at the bottom level, and propagated up from there.
*/
if (IsA(node, SubPlan))
{
SubPlan *spexpr = (SubPlan *) node;
decorate_subplan_info *sstate = &context->subplans[spexpr->plan_id];
sstate->is_initplan = spexpr->is_initplan;
sstate->useHashTable = spexpr->useHashTable;
if (sstate->processed)
elog(ERROR, "found reference to already-processed SubPlan");
if (sstate->sliceDepth == -1)
context->subplan_workingQueue = lappend_int(context->subplan_workingQueue,
spexpr->plan_id);
if (!spexpr->is_initplan && context->sliceDepth > sstate->sliceDepth)
{
sstate->sliceDepth = context->sliceDepth;
sstate->parentFlow = context->currentPlanFlow;
}
}
return node;
}
plan = (Plan *) node;
if (IsA(plan, Motion))
{
Motion *motion = (Motion *) plan;
/* sanity check: Sub plan must have flow */
Assert(motion->plan.lefttree->flow);
/* sanity check: we haven't assigned slice/motion IDs yet */
Assert(motion->motionID == 0);
/*
* Recurse into the Motion's initPlans and other fields, and the
* subtree.
*
* All the fields on the Motion node are considered part of the sending
* slice.
*/
saveCurrentPlanFlow = context->currentPlanFlow;
if (plan->lefttree->flow->locustype != CdbLocusType_OuterQuery)
context->currentPlanFlow = plan->lefttree->flow;
context->sliceDepth++;
plan = (Plan *) plan_tree_mutator((Node *) plan,
fix_outer_query_motions_mutator,
context,
false);
motion = (Motion *) plan;
context->sliceDepth--;
context->currentPlanFlow = saveCurrentPlanFlow;
/*
* If this is a Motion node in a correlated SubPlan, where we bring
* the result to the parent, the Motion was marked as type
* MOTIONTYPE_OUTER_QUERY. Because we didn't know whether the parent
* is distributed, replicated, or a single QD/QE process, when the
* subquery was planned, we fix that up here. Modify the Motion node
* so that it brings the result to the parent.
*/
if (motion->motionType == MOTIONTYPE_OUTER_QUERY)
{
Assert(!motion->sendSorted);
if (context->currentPlanFlow->flotype == FLOW_SINGLETON)
{
motion->motionType = MOTIONTYPE_GATHER;
}
else if (context->currentPlanFlow->flotype == FLOW_REPLICATED ||
context->currentPlanFlow->flotype == FLOW_PARTITIONED)
{
motion->motionType = MOTIONTYPE_BROADCAST;
}
else
elog(ERROR, "unexpected Flow type in parent of a SubPlan");
}
/*
* For non-top slice, if this motion is QE singleton and subplan's locus
* is CdbLocusType_SegmentGeneral, omit this motion.
*/
if (context->sliceDepth > 0 &&
context->currentPlanFlow->flotype == FLOW_SINGLETON &&
context->currentPlanFlow->segindex == 0 &&
motion->plan.lefttree->flow->locustype == CdbLocusType_SegmentGeneral)
{
/*
* Omit this motion. If there were any InitPlans attached to it,
* make sure we keep them.
*/
Plan *child;
child = motion->plan.lefttree;
child->initPlan = list_concat(child->initPlan, motion->plan.initPlan);
newnode = (Node *) child;
}
else
{
newnode = (Node *) motion;
}
}
else
{
/*
* Copy this node and mutate its children. Afterwards, this node should be
* an exact image of the input node, except that contained nodes requiring
* parallelization will have had it applied.
*/
saveCurrentPlanFlow = context->currentPlanFlow;
if (plan->flow != NULL && plan->flow->locustype != CdbLocusType_OuterQuery)
context->currentPlanFlow = plan->flow;
newnode = plan_tree_mutator(node, fix_outer_query_motions_mutator, context, false);
context->currentPlanFlow = saveCurrentPlanFlow;
}
return newnode;
}
/*
* Add a Motion node on top of a Plan if needed, to make the result available
* in 'outer_query_flow'. Subroutine of cdbllize_fix_outer_query_motions().
*/
static Plan *
fix_subplan_motion(PlannerInfo *root, Plan *subplan, Flow *outer_query_flow)
{
bool need_motion;
if (outer_query_flow->flotype == FLOW_SINGLETON)
{
if (subplan->flow->flotype != FLOW_SINGLETON)
need_motion = true;
else
need_motion = false;
}
else if (outer_query_flow->flotype == FLOW_REPLICATED ||
outer_query_flow->flotype == FLOW_PARTITIONED)
{
// FIXME: Isn't it pointless to broadcast if it's already replicated?
need_motion = true;
}
else
elog(ERROR, "unexpected flow type %d in parent of SubPlan expression",
outer_query_flow->flotype);
if (need_motion)
{
/*
* We need to add a Motion to the top.
*/
PlanSlice *sendSlice;
Motion *motion;
Flow *subFlow = subplan->flow;
List *initPlans = NIL;
Assert(subFlow);
/*
* Strip off any Material nodes at the top. There's no point in
* materializing just below a Motion, because a Motion is never
* rescanned.
*
* If we strip off any nodes, make sure we preserve any initPlans
* that were attached to them.
*/
while (IsA(subplan, Material))
{
initPlans = list_concat(initPlans, subplan->initPlan);
subplan = subplan->lefttree;
}
/*
* If there's an existing Motion on top, we can remove it, and
* replace it with a new Motion. But make sure we reuse the sender
* slice information. We could build a new one here, but then we'd
* lose any direct dispatch information.
*
* Otherwise, build a new sender slice.
*/
if (IsA(subplan, Motion))
{
Motion *strippedMotion = (Motion *) subplan;
sendSlice = strippedMotion->senderSliceInfo;
subplan = subplan->lefttree;
initPlans = list_concat(initPlans, strippedMotion->plan.initPlan);
}
else
{
sendSlice = palloc0(sizeof(PlanSlice));
sendSlice->gangType = GANGTYPE_PRIMARY_READER;
sendSlice->sliceIndex = -1;
if (subFlow->flotype != FLOW_SINGLETON)
{
sendSlice->numsegments = subFlow->numsegments;
sendSlice->segindex = 0;
}
else
{
if (subFlow->segindex == -1)
sendSlice->gangType = GANGTYPE_ENTRYDB_READER;
else
sendSlice->gangType = GANGTYPE_SINGLETON_READER;
sendSlice->numsegments = 1;
sendSlice->segindex = subFlow->segindex;
}
}
motion = make_motion(NULL, subplan,
0, NULL, NULL, NULL, NULL /* no ordering */);
motion->motionType = (outer_query_flow->flotype == FLOW_SINGLETON) ?
MOTIONTYPE_GATHER : MOTIONTYPE_BROADCAST;
motion->hashExprs = NIL;
motion->hashFuncs = NULL;
motion->plan.lefttree->flow = subFlow;
motion->plan.initPlan = initPlans;
motion->senderSliceInfo = sendSlice;
subplan = (Plan *) motion;
}
return subplan;
}
/*
* cdbllize_build_slice_table -- Build the (planner) slice table for a query.
*/
void
cdbllize_build_slice_table(PlannerInfo *root, Plan *top_plan,
PlanSlice *top_slice)
{
PlannerGlobal *glob = root->glob;
Query *query = root->parse;
build_slice_table_context cxt;
ListCell *lc;
int sliceIndex;
bool all_root_slices;
/*
* This can modify nodes, so the nodes we memorized earlier are no longer
* valid. Clear this array just to be sure we don't accidentally use the
* obsolete copies of the nodes later on.
*/
if (glob->share.shared_plans)
{
pfree(glob->share.shared_plans);
glob->share.shared_plans = NULL;
}
/* subplan_sliceIds array needs to exist, even in non-dispatcher mode */
root->glob->subplan_sliceIds = palloc(list_length(root->glob->subplans) * sizeof(int));
for(int i = 0; i < list_length(root->glob->subplans); i++)
root->glob->subplan_sliceIds[i] = -1;
/*
* In non-dispatcher mode, there are no motions and no dispatching.
* In utility mode, we still need a slice, at least some of the
* EXPLAIN code expects it.
*/
if (Gp_role == GP_ROLE_UTILITY)
{
PlanSlice *dummySlice;
dummySlice = palloc0(sizeof(PlanSlice));
dummySlice->sliceIndex = 0;
dummySlice->parentIndex = 0;
dummySlice->gangType = GANGTYPE_UNALLOCATED;
dummySlice->numsegments = 0;
dummySlice->segindex = -1;
root->glob->numSlices = 1;
root->glob->slices = dummySlice;
return;
}
if (Gp_role == GP_ROLE_EXECUTE)
return;
Assert(Gp_role == GP_ROLE_DISPATCH);
planner_init_plan_tree_base(&cxt.base, root);
cxt.seen_subplans = NULL;
Assert(root->glob->slices == NULL);
top_slice->sliceIndex = 0;
top_slice->parentIndex = -1;
cxt.slices = list_make1(top_slice);
cxt.currentSliceIndex = 0;
/*
* Walk through the main plan tree, and recursively all SubPlans.
* Create a new PlanSlice, and assign a slice ID at every Motion
* and Init Plan. Also makes note of all the Subplans that are
* reachable from the top plan, so that unused SubPlans can be
* eliminated later.
*/
(void) build_slice_table_walker((Node *) top_plan, &cxt);
/*
* Remove unused subplans, and Init Plans.
*
*/
remove_unused_initplans(top_plan, root);
/*
* Remove unused subplans.
*
* Executor initializes state for subplans even they are unused.
* When the generated subplan is not used and has motion inside,
* causing motionID not being assigned, which will break sanity
* check when executor tries to initialize subplan state.
*/
int plan_id = 1;
foreach(lc, root->glob->subplans)
{
if (!bms_is_member(plan_id, cxt.seen_subplans))
{
Plan *dummy_plan;
/*
* This subplan is unused. Replace it in the global list of
* subplans with a dummy. (We can't just remove it from the global
* list, because that would screw up the plan_id numbering of the
* subplans).
*/
pfree(lfirst(lc));
dummy_plan = (Plan *) make_result(NIL,
(Node *) list_make1(makeBoolConst(false, false)),
NULL);
lfirst(lc) = dummy_plan;
}
plan_id++;
}
/*
* If none of the slices require dispatching, we can run everything in one slice.
*/
all_root_slices = true;
foreach (lc, cxt.slices)
{
PlanSlice *slice = (PlanSlice *) lfirst(lc);
if (slice->gangType != GANGTYPE_UNALLOCATED)
{
all_root_slices = false;
break;
}
}
if (all_root_slices)
{
for(int i = 0; i < list_length(root->glob->subplans); i++)
root->glob->subplan_sliceIds[i] = -1;
cxt.slices = list_truncate(cxt.slices, 1);
}
/*
* Turn the list of PlanSlices into an array, for dispatching.
*/
root->glob->numSlices = list_length(cxt.slices);
root->glob->slices = palloc(root->glob->numSlices * sizeof(PlanSlice));
sliceIndex = 0;
foreach (lc, cxt.slices)
{
PlanSlice *src = (PlanSlice *) lfirst(lc);
memcpy(&root->glob->slices[sliceIndex], src, sizeof(PlanSlice));
sliceIndex++;
}
list_free_deep(cxt.slices);
/*
* Discard subtrees of Query node that aren't needed for execution. Note
* the targetlist (query->targetList) is used in execution of prepared
* statements, so we leave that alone.
*/
query->jointree = NULL;
query->groupClause = NIL;
query->havingQual = NULL;
query->distinctClause = NIL;
query->sortClause = NIL;
query->limitCount = NULL;
query->limitOffset = NULL;
query->setOperations = NULL;
}
static bool
build_slice_table_walker(Node *node, build_slice_table_context *context)
{
PlannerInfo *root = (PlannerInfo *) context->base.node;
if (node == NULL)
return false;
if (IsA(node, SubPlan))
{
SubPlan *spexpr = (SubPlan *) node;
int plan_id = spexpr->plan_id;
/*
* Recurse into each subplan only once, even if there are multiple
* SubPlans referring to it.
*/
if (!bms_is_member(plan_id, context->seen_subplans))
{
bool result;
int save_currentSliceIndex;
context->seen_subplans = bms_add_member(context->seen_subplans, plan_id);
save_currentSliceIndex = context->currentSliceIndex;
if (spexpr->is_initplan)
{
PlanSlice *initTopSlice;
initTopSlice = palloc0(sizeof(PlanSlice));
initTopSlice->gangType = GANGTYPE_UNALLOCATED;
initTopSlice->numsegments = 0;
initTopSlice->segindex = -1;
initTopSlice->parentIndex = -1;
initTopSlice->sliceIndex = list_length(context->slices);
context->slices = lappend(context->slices, initTopSlice);
context->currentSliceIndex = initTopSlice->sliceIndex;
}
root->glob->subplan_sliceIds[plan_id - 1] = context->currentSliceIndex;
result = plan_tree_walker(node,
build_slice_table_walker,
context,
true);
context->currentSliceIndex = save_currentSliceIndex;
return result;
}
}
if (IsA(node, Motion))
{
Motion *motion = (Motion *) node;
PlanSlice *sendSlice;
int save_currentSliceIndex;
bool result;
Assert(motion->motionID == 0);
/* set up a new slice */
sendSlice = motion->senderSliceInfo;
Assert(sendSlice->sliceIndex == -1);
sendSlice->sliceIndex = list_length(context->slices);
sendSlice->parentIndex = context->currentSliceIndex;
motion->motionID = sendSlice->sliceIndex;
motion->senderSliceInfo = NULL;
context->slices = lappend(context->slices, sendSlice);
save_currentSliceIndex = context->currentSliceIndex;
context->currentSliceIndex = sendSlice->sliceIndex;
if (sendSlice->directDispatch.isDirectDispatch &&
sendSlice->directDispatch.contentIds == NIL)
{
/*
* Direct dispatch, but we've already determined that there will
* be no match on any segment. Doesn't matter which segmnent produces
* the non-result.
*
* XXX: Would've been better to replace this with a dummy Result
* node or something in the planner.
*/
sendSlice->directDispatch.contentIds = list_make1_int(0);
}
result = plan_tree_walker((Node *) motion,
build_slice_table_walker,
context,
false);
context->currentSliceIndex = save_currentSliceIndex;
return result;
}
return plan_tree_walker(node,
build_slice_table_walker,
context,
false);
}
/*
* Construct a new Flow in the current memory context.
*/
Flow *
makeFlow(FlowType flotype, int numsegments)
{
Flow *flow = makeNode(Flow);
Assert(numsegments > 0);
flow->flotype = flotype;
flow->locustype = CdbLocusType_Null;
flow->numsegments = numsegments;
return flow;
}
/*
* Is the node a "subclass" of Plan?
*/
bool
is_plan_node(Node *node)
{
if (node == NULL)
return false;
if (nodeTag(node) >= T_Plan_Start && nodeTag(node) < T_Plan_End)
return true;
return false;
}
#define SANITY_MOTION 0x1
#define SANITY_DEADLOCK 0x2
typedef struct sanity_result_t
{
plan_tree_base_prefix base; /* Required prefix for
* plan_tree_walker/mutator */
int flags;
} sanity_result_t;
static bool
motion_sanity_walker(Node *node, sanity_result_t *result)
{
sanity_result_t left_result;
sanity_result_t right_result;
bool deadlock_safe = false;
char *branch_label;
left_result.base = result->base;
left_result.flags = 0;
right_result.base = result->base;
right_result.flags = 0;
if (node == NULL)
return false;
if (!is_plan_node(node))
return false;
/*
* Special handling for branch points because there is a possibility of a
* deadlock if there are Motions in both branches and one side is not
* first pre-fetched.
*
* The deadlock occurs because, when the buffers on the Send side of a
* Motion are completely filled with tuples, it blocks waiting for an ACK.
* Without prefetch_inner, the Join node reads one tuple from the outer
* side first and then starts retrieving tuples from the inner side -
* either to build a hash table (in case of HashJoin) or for joining (in
* case of MergeJoin and NestedLoopJoin).
*
* Thus we can end up with 4 processes infinitely waiting for each other :
*
* A : Join slice that already retrieved an outer tuple, and is waiting
* for inner tuples from D. B : Join slice that is still waiting for the
* first outer tuple from C. C : Outer slice whose buffer is full sending
* tuples to A and is blocked waiting for an ACK from A. D : Inner slice
* that is full sending tuples to B and is blocked waiting for an ACK from
* B.
*
* A cannot ACK C because it is waiting to finish retrieving inner tuples
* from D. B cannot ACK D because it is waiting for it's first outer tuple
* from C before accepting any inner tuples. This forms a circular
* dependency resulting in a deadlock : C -> A -> D -> B -> C.
*
* We avoid this by pre-fetching all the inner tuples in such cases and
* materializing them in some fashion, before moving on to outer_tuples.
* This effectively breaks the cycle and prevents deadlock.
*
* Details:
* https://groups.google.com/a/greenplum.org/forum/#!msg/gpdb-dev/gMa1tW0x_fk/wuzvGXBaBAAJ
*/
switch (nodeTag(node))
{
case T_HashJoin: /* Hash join can't deadlock -- it fully
* materializes its inner before switching to
* its outer. */
branch_label = "HJ";
if (((HashJoin *) node)->join.prefetch_inner)
deadlock_safe = true;
break;
case T_NestLoop: /* Nested loop joins are safe only if the
* prefetch flag is set */
branch_label = "NL";
if (((NestLoop *) node)->join.prefetch_inner)
deadlock_safe = true;
break;
case T_MergeJoin:
branch_label = "MJ";
if (((MergeJoin *) node)->join.prefetch_inner)
deadlock_safe = true;
break;
default:
branch_label = NULL;
break;
}
/* now scan the subplans */
switch (nodeTag(node))
{
case T_Result:
case T_WindowAgg:
case T_TableFunctionScan:
case T_ShareInputScan:
case T_Append:
case T_MergeAppend:
case T_SeqScan:
case T_SampleScan:
case T_IndexScan:
case T_BitmapIndexScan:
case T_BitmapHeapScan:
case T_TidScan:
case T_SubqueryScan:
case T_FunctionScan:
case T_ValuesScan:
case T_Agg:
case T_TupleSplit:
case T_Unique:
case T_Hash:
case T_SetOp:
case T_Limit:
case T_Sort:
case T_Material:
case T_ForeignScan:
if (plan_tree_walker(node, motion_sanity_walker, result, true))
return true;
break;
case T_Motion:
if (plan_tree_walker(node, motion_sanity_walker, result, true))
return true;
result->flags |= SANITY_MOTION;
elog(DEBUG5, " found motion");
break;
case T_HashJoin:
case T_NestLoop:
case T_MergeJoin:
{
Plan *plan = (Plan *) node;
elog(DEBUG5, " %s going left", branch_label);
if (motion_sanity_walker((Node *) plan->lefttree, &left_result))
return true;
elog(DEBUG5, " %s going right", branch_label);
if (motion_sanity_walker((Node *) plan->righttree, &right_result))
return true;
elog(DEBUG5, " %s branch point left 0x%x right 0x%x",
branch_label, left_result.flags, right_result.flags);
/* deadlocks get sent up immediately */
if ((left_result.flags & SANITY_DEADLOCK) ||
(right_result.flags & SANITY_DEADLOCK))
{
result->flags |= SANITY_DEADLOCK;
break;
}
/*
* if this node is "deadlock safe" then even if we have motion
* on both sides we will not deadlock (because the access
* pattern is deadlock safe: all rows are retrieved from one
* side before the first row from the other).
*/
if (!deadlock_safe && ((left_result.flags & SANITY_MOTION) &&
(right_result.flags & SANITY_MOTION)))
{
elog(LOG, "FOUND MOTION DEADLOCK in %s", branch_label);
result->flags |= SANITY_DEADLOCK;
break;
}
result->flags |= left_result.flags | right_result.flags;
elog(DEBUG5, " %s branch point left 0x%x right 0x%x res 0x%x%s",
branch_label, left_result.flags, right_result.flags, result->flags, deadlock_safe ? " deadlock safe: prefetching" : "");
}
break;
default:
break;
}
return false;
}
void
motion_sanity_check(PlannerInfo *root, Plan *plan)
{
sanity_result_t sanity_result;
planner_init_plan_tree_base(&sanity_result.base, root);
sanity_result.flags = 0;
elog(DEBUG5, "Motion Deadlock Sanity Check");
if (motion_sanity_walker((Node *) plan, &sanity_result))
elog(ERROR, "motion sanity walker returned true");
if (sanity_result.flags & SANITY_DEADLOCK)
elog(ERROR, "Post-planning sanity check detected motion deadlock.");
}
static void
adjust_top_path_for_parallel_retrieve_cursor(Path *path, PlanSlice *slice)
{
if (CdbPathLocus_IsSingleQE(path->locus)
|| CdbPathLocus_IsGeneral(path->locus)
|| CdbPathLocus_IsEntry(path->locus))
{
/*
* For these scenarios, parallel retrieve cursor needs to run on entrydb
* since endpoint QE needs to interact with the retrieve connections.
*/
slice->numsegments = 1;
slice->gangType = GANGTYPE_ENTRYDB_READER;
}
else if (CdbPathLocus_IsSegmentGeneral(path->locus))
{
/*
* queries to replicated table run on a single segment.
*/
slice->segindex = gp_session_id % path->locus.numsegments;
slice->numsegments = path->locus.numsegments;
slice->gangType = GANGTYPE_SINGLETON_READER;
}
else
{
/*
* queries to non-replicated table run on segments.
*/
slice->numsegments = path->locus.numsegments;
slice->gangType = GANGTYPE_PRIMARY_READER;
}
}
相关信息
相关文章
greenplumn cdbappendonlystorageformat 源码
greenplumn cdbappendonlystorageread 源码
greenplumn cdbappendonlystoragewrite 源码
greenplumn cdbappendonlyxlog 源码
greenplumn cdbbufferedappend 源码
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦