greenplumn nodeAppend 源码

  • 2022-08-18
  • 浏览 (339)

greenplumn nodeAppend 代码

文件路径:/src/backend/executor/nodeAppend.c

/*-------------------------------------------------------------------------
 *
 * nodeAppend.c
 *	  routines to handle append nodes.
 *
 * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
 * Portions Copyright (c) 1994, Regents of the University of California
 *
 *
 * IDENTIFICATION
 *	  src/backend/executor/nodeAppend.c
 *
 *-------------------------------------------------------------------------
 */
/* INTERFACE ROUTINES
 *		ExecInitAppend	- initialize the append node
 *		ExecAppend		- retrieve the next tuple from the node
 *		ExecEndAppend	- shut down the append node
 *		ExecReScanAppend - rescan the append node
 *
 *	 NOTES
 *		Each append node contains a list of one or more subplans which
 *		must be iteratively processed (forwards or backwards).
 *		Tuples are retrieved by executing the 'whichplan'th subplan
 *		until the subplan stops returning tuples, at which point that
 *		plan is shut down and the next started up.
 *
 *		Append nodes don't make use of their left and right
 *		subtrees, rather they maintain a list of subplans so
 *		a typical append node looks like this in the plan tree:
 *
 *				   ...
 *				   /
 *				Append -------+------+------+--- nil
 *				/	\		  |		 |		|
 *			  nil	nil		 ...    ...    ...
 *								 subplans
 *
 *		Append nodes are currently used for unions, and to support
 *		inheritance queries, where several relations need to be scanned.
 *		For example, in our standard person/student/employee/student-emp
 *		example, where student and employee inherit from person
 *		and student-emp inherits from student and employee, the
 *		query:
 *
 *				select name from person
 *
 *		generates the plan:
 *
 *				  |
 *				Append -------+-------+--------+--------+
 *				/	\		  |		  |		   |		|
 *			  nil	nil		 Scan	 Scan	  Scan	   Scan
 *							  |		  |		   |		|
 *							person employee student student-emp
 */

#include "postgres.h"

#include "executor/execdebug.h"
#include "executor/execPartition.h"
#include "executor/nodeAppend.h"
#include "miscadmin.h"

/* Shared state for parallel-aware Append. */
struct ParallelAppendState
{
	LWLock		pa_lock;		/* mutual exclusion to choose next subplan */
	int			pa_next_plan;	/* next plan to choose by any worker */

	/*
	 * pa_finished[i] should be true if no more workers should select subplan
	 * i.  for a non-partial plan, this should be set to true as soon as a
	 * worker selects the plan; for a partial plan, it remains false until
	 * some worker executes the plan to completion.
	 */
	bool		pa_finished[FLEXIBLE_ARRAY_MEMBER];
};

#define INVALID_SUBPLAN_INDEX		-1
#define NO_MATCHING_SUBPLANS		-2

static TupleTableSlot *ExecAppend(PlanState *pstate);
static bool choose_next_subplan_locally(AppendState *node);
static bool choose_next_subplan_for_leader(AppendState *node);
static bool choose_next_subplan_for_worker(AppendState *node);
static void mark_invalid_subplans_as_finished(AppendState *node);

/* ----------------------------------------------------------------
 *		ExecInitAppend
 *
 *		Begin all of the subscans of the append node.
 *
 *	   (This is potentially wasteful, since the entire result of the
 *		append node may not be scanned, but this way all of the
 *		structures get allocated in the executor's top level memory
 *		block instead of that of the call to ExecAppend.)
 * ----------------------------------------------------------------
 */
AppendState *
ExecInitAppend(Append *node, EState *estate, int eflags)
{
	AppendState *appendstate = makeNode(AppendState);
	PlanState **appendplanstates;
	Bitmapset  *validsubplans;
	int			nplans;
	int			firstvalid;
	int			i,
				j;
	ListCell   *lc;

	/* check for unsupported flags */
	Assert(!(eflags & EXEC_FLAG_MARK));

	/*
	 * create new AppendState for our append node
	 */
	appendstate->ps.plan = (Plan *) node;
	appendstate->ps.state = estate;
	appendstate->ps.ExecProcNode = ExecAppend;

	/* Let choose_next_subplan_* function handle setting the first subplan */
	appendstate->as_whichplan = INVALID_SUBPLAN_INDEX;

	/* If run-time partition pruning is enabled, then set that up now */
	if (node->part_prune_info != NULL)
	{
		PartitionPruneState *prunestate;

		/* We may need an expression context to evaluate partition exprs */
		ExecAssignExprContext(estate, &appendstate->ps);

		/* Create the working data structure for pruning. */
		prunestate = ExecCreatePartitionPruneState(&appendstate->ps,
												   node->part_prune_info);
		appendstate->as_prune_state = prunestate;

		/* Perform an initial partition prune, if required. */
		if (prunestate->do_initial_prune)
		{
			/* Determine which subplans survive initial pruning */
			validsubplans = ExecFindInitialMatchingSubPlans(prunestate,
															list_length(node->appendplans));

			/*
			 * The case where no subplans survive pruning must be handled
			 * specially.  The problem here is that code in explain.c requires
			 * an Append to have at least one subplan in order for it to
			 * properly determine the Vars in that subplan's targetlist.  We
			 * sidestep this issue by just initializing the first subplan and
			 * setting as_whichplan to NO_MATCHING_SUBPLANS to indicate that
			 * we don't really need to scan any subnodes.
			 */
			if (bms_is_empty(validsubplans))
			{
				appendstate->as_whichplan = NO_MATCHING_SUBPLANS;

				/* Mark the first as valid so that it's initialized below */
				validsubplans = bms_make_singleton(0);
			}

			nplans = bms_num_members(validsubplans);
		}
		else
		{
			/* We'll need to initialize all subplans */
			nplans = list_length(node->appendplans);
			Assert(nplans > 0);
			validsubplans = bms_add_range(NULL, 0, nplans - 1);
		}

		/*
		 * If no runtime pruning is required, we can fill as_valid_subplans
		 * immediately, preventing later calls to ExecFindMatchingSubPlans.
		 */
		if (!prunestate->do_exec_prune)
		{
			Assert(nplans > 0);
			appendstate->as_valid_subplans = bms_add_range(NULL, 0, nplans - 1);
		}
	}
	else
	{
		nplans = list_length(node->appendplans);

		/*
		 * When run-time partition pruning is not enabled we can just mark all
		 * subplans as valid; they must also all be initialized.
		 */
		Assert(nplans > 0);
		appendstate->as_valid_subplans = validsubplans =
			bms_add_range(NULL, 0, nplans - 1);
		appendstate->as_prune_state = NULL;

		if (node->join_prune_paramids)
			appendstate->as_valid_subplans = NULL;
	}

	/*
	 * Initialize result tuple type and slot.
	 */
	ExecInitResultTupleSlotTL(&appendstate->ps, &TTSOpsVirtual);

	/* node returns slots from each of its subnodes, therefore not fixed */
	appendstate->ps.resultopsset = true;
	appendstate->ps.resultopsfixed = false;

	appendplanstates = (PlanState **) palloc(nplans *
											 sizeof(PlanState *));

	/*
	 * call ExecInitNode on each of the valid plans to be executed and save
	 * the results into the appendplanstates array.
	 *
	 * While at it, find out the first valid partial plan.
	 */
	j = i = 0;
	firstvalid = nplans;
	foreach(lc, node->appendplans)
	{
		if (bms_is_member(i, validsubplans))
		{
			Plan	   *initNode = (Plan *) lfirst(lc);

			/*
			 * Record the lowest appendplans index which is a valid partial
			 * plan.
			 */
			if (i >= node->first_partial_plan && j < firstvalid)
				firstvalid = j;

			appendplanstates[j++] = ExecInitNode(initNode, estate, eflags);
		}
		i++;
	}

	appendstate->as_first_partial_plan = firstvalid;
	appendstate->appendplans = appendplanstates;
	appendstate->as_nplans = nplans;

	/*
	 * Miscellaneous initialization
	 */

	appendstate->ps.ps_ProjInfo = NULL;

	/* For parallel query, this will be overridden later. */
	appendstate->choose_next_subplan = choose_next_subplan_locally;

	return appendstate;
}

/* ----------------------------------------------------------------
 *	   ExecAppend
 *
 *		Handles iteration over multiple subplans.
 * ----------------------------------------------------------------
 */
static TupleTableSlot *
ExecAppend(PlanState *pstate)
{
	AppendState *node = castNode(AppendState, pstate);

	if (node->as_whichplan < 0)
	{
		/*
		 * If no subplan has been chosen, we must choose one before
		 * proceeding.
		 */
		if (node->as_whichplan == INVALID_SUBPLAN_INDEX &&
			!node->choose_next_subplan(node))
			return ExecClearTuple(node->ps.ps_ResultTupleSlot);

		/* Nothing to do if there are no matching subplans */
		else if (node->as_whichplan == NO_MATCHING_SUBPLANS)
			return ExecClearTuple(node->ps.ps_ResultTupleSlot);
	}

	for (;;)
	{
		PlanState  *subnode;
		TupleTableSlot *result;

		CHECK_FOR_INTERRUPTS();

		/*
		 * figure out which subplan we are currently processing
		 */
		Assert(node->as_whichplan >= 0 && node->as_whichplan < node->as_nplans);
		subnode = node->appendplans[node->as_whichplan];

		/*
		 * get a tuple from the subplan
		 */
		result = ExecProcNode(subnode);

		if (!TupIsNull(result))
		{
			/*
			 * If the subplan gave us something then return it as-is. We do
			 * NOT make use of the result slot that was set up in
			 * ExecInitAppend; there's no need for it.
			 */
			return result;
		}

		/* choose new subplan; if none, we're done */
		if (!node->choose_next_subplan(node))
			return ExecClearTuple(node->ps.ps_ResultTupleSlot);
	}
}

/* ----------------------------------------------------------------
 *		ExecEndAppend
 *
 *		Shuts down the subscans of the append node.
 *
 *		Returns nothing of interest.
 * ----------------------------------------------------------------
 */
void
ExecEndAppend(AppendState *node)
{
	PlanState **appendplans;
	int			nplans;
	int			i;

	/*
	 * get information from the node
	 */
	appendplans = node->appendplans;
	nplans = node->as_nplans;

	/*
	 * shut down each of the subscans
	 */
	for (i = nplans-1; i >= 0; --i) 
	{
		if (appendplans[i])
			ExecEndNode(appendplans[i]);
	}
}

void
ExecReScanAppend(AppendState *node)
{
	int			i;

	/*
	 * If any PARAM_EXEC Params used in pruning expressions have changed, then
	 * we'd better unset the valid subplans so that they are reselected for
	 * the new parameter values.
	 */
	if (node->as_prune_state &&
		bms_overlap(node->ps.chgParam,
					node->as_prune_state->execparamids))
	{
		bms_free(node->as_valid_subplans);
		node->as_valid_subplans = NULL;
	}

	for (i = 0; i < node->as_nplans; i++)
	{
		PlanState  *subnode = node->appendplans[i];

		/*
		 * ExecReScan doesn't know about my subplans, so I have to do
		 * changed-parameter signaling myself.
		 */
		if (node->ps.chgParam != NULL)
			UpdateChangedParamSet(subnode, node->ps.chgParam);

		/*
		 * If chgParam of subnode is not null then plan will be re-scanned by
		 * first ExecProcNode.
		 */
		if (subnode->chgParam == NULL)
			ExecReScan(subnode);
	}

	/* Let choose_next_subplan_* function handle setting the first subplan */
	node->as_whichplan = INVALID_SUBPLAN_INDEX;
}

/* ----------------------------------------------------------------
 *						Parallel Append Support
 * ----------------------------------------------------------------
 */

/* ----------------------------------------------------------------
 *		ExecAppendEstimate
 *
 *		Compute the amount of space we'll need in the parallel
 *		query DSM, and inform pcxt->estimator about our needs.
 * ----------------------------------------------------------------
 */
void
ExecAppendEstimate(AppendState *node,
				   ParallelContext *pcxt)
{
	node->pstate_len =
		add_size(offsetof(ParallelAppendState, pa_finished),
				 sizeof(bool) * node->as_nplans);

	shm_toc_estimate_chunk(&pcxt->estimator, node->pstate_len);
	shm_toc_estimate_keys(&pcxt->estimator, 1);
}


/* ----------------------------------------------------------------
 *		ExecAppendInitializeDSM
 *
 *		Set up shared state for Parallel Append.
 * ----------------------------------------------------------------
 */
void
ExecAppendInitializeDSM(AppendState *node,
						ParallelContext *pcxt)
{
	ParallelAppendState *pstate;

	pstate = shm_toc_allocate(pcxt->toc, node->pstate_len);
	memset(pstate, 0, node->pstate_len);
	LWLockInitialize(&pstate->pa_lock, LWTRANCHE_PARALLEL_APPEND);
	shm_toc_insert(pcxt->toc, node->ps.plan->plan_node_id, pstate);

	node->as_pstate = pstate;
	node->choose_next_subplan = choose_next_subplan_for_leader;
}

/* ----------------------------------------------------------------
 *		ExecAppendReInitializeDSM
 *
 *		Reset shared state before beginning a fresh scan.
 * ----------------------------------------------------------------
 */
void
ExecAppendReInitializeDSM(AppendState *node, ParallelContext *pcxt)
{
	ParallelAppendState *pstate = node->as_pstate;

	pstate->pa_next_plan = 0;
	memset(pstate->pa_finished, 0, sizeof(bool) * node->as_nplans);
}

/* ----------------------------------------------------------------
 *		ExecAppendInitializeWorker
 *
 *		Copy relevant information from TOC into planstate, and initialize
 *		whatever is required to choose and execute the optimal subplan.
 * ----------------------------------------------------------------
 */
void
ExecAppendInitializeWorker(AppendState *node, ParallelWorkerContext *pwcxt)
{
	node->as_pstate = shm_toc_lookup(pwcxt->toc, node->ps.plan->plan_node_id, false);
	node->choose_next_subplan = choose_next_subplan_for_worker;
}

/* ----------------------------------------------------------------
 *		choose_next_subplan_locally
 *
 *		Choose next subplan for a non-parallel-aware Append,
 *		returning false if there are no more.
 * ----------------------------------------------------------------
 */
static bool
choose_next_subplan_locally(AppendState *node)
{
	int			whichplan = node->as_whichplan;
	int			nextplan;

	/* We should never be called when there are no subplans */
	Assert(whichplan != NO_MATCHING_SUBPLANS);

	/*
	 * If first call then have the bms member function choose the first valid
	 * subplan by initializing whichplan to -1.  If there happen to be no
	 * valid subplans then the bms member function will handle that by
	 * returning a negative number which will allow us to exit returning a
	 * false value.
	 */
	if (whichplan == INVALID_SUBPLAN_INDEX)
	{
		if (node->as_valid_subplans == NULL)
		{
			Append	   *plan = (Append *) node->ps.plan;

			node->as_valid_subplans =
				ExecFindMatchingSubPlans(node->as_prune_state,
										 node->ps.state,
										 list_length(plan->appendplans),
										 plan->join_prune_paramids);
		}

		whichplan = -1;
	}

	/* Ensure whichplan is within the expected range */
	Assert(whichplan >= -1 && whichplan <= node->as_nplans);

	if (ScanDirectionIsForward(node->ps.state->es_direction))
		nextplan = bms_next_member(node->as_valid_subplans, whichplan);
	else
		nextplan = bms_prev_member(node->as_valid_subplans, whichplan);

	if (nextplan < 0)
		return false;

	node->as_whichplan = nextplan;

	return true;
}

/* ----------------------------------------------------------------
 *		choose_next_subplan_for_leader
 *
 *      Try to pick a plan which doesn't commit us to doing much
 *      work locally, so that as much work as possible is done in
 *      the workers.  Cheapest subplans are at the end.
 * ----------------------------------------------------------------
 */
static bool
choose_next_subplan_for_leader(AppendState *node)
{
	ParallelAppendState *pstate = node->as_pstate;

	/* Backward scan is not supported by parallel-aware plans */
	Assert(ScanDirectionIsForward(node->ps.state->es_direction));

	/* We should never be called when there are no subplans */
	Assert(node->as_whichplan != NO_MATCHING_SUBPLANS);

	LWLockAcquire(&pstate->pa_lock, LW_EXCLUSIVE);

	if (node->as_whichplan != INVALID_SUBPLAN_INDEX)
	{
		/* Mark just-completed subplan as finished. */
		node->as_pstate->pa_finished[node->as_whichplan] = true;
	}
	else
	{
		/* Start with last subplan. */
		node->as_whichplan = node->as_nplans - 1;

		/*
		 * If we've yet to determine the valid subplans then do so now.  If
		 * run-time pruning is disabled then the valid subplans will always be
		 * set to all subplans.
		 */
		if (node->as_valid_subplans == NULL)
		{
			Append	   *plan = (Append *) node->ps.plan;

			node->as_valid_subplans =
				ExecFindMatchingSubPlans(node->as_prune_state,
										 node->ps.state,
										 list_length(plan->appendplans),
										 plan->join_prune_paramids);

			/*
			 * Mark each invalid plan as finished to allow the loop below to
			 * select the first valid subplan.
			 */
			mark_invalid_subplans_as_finished(node);
		}
	}

	/* Loop until we find a subplan to execute. */
	while (pstate->pa_finished[node->as_whichplan])
	{
		if (node->as_whichplan == 0)
		{
			pstate->pa_next_plan = INVALID_SUBPLAN_INDEX;
			node->as_whichplan = INVALID_SUBPLAN_INDEX;
			LWLockRelease(&pstate->pa_lock);
			return false;
		}

		/*
		 * We needn't pay attention to as_valid_subplans here as all invalid
		 * plans have been marked as finished.
		 */
		node->as_whichplan--;
	}

	/* If non-partial, immediately mark as finished. */
	if (node->as_whichplan < node->as_first_partial_plan)
		node->as_pstate->pa_finished[node->as_whichplan] = true;

	LWLockRelease(&pstate->pa_lock);

	return true;
}

/* ----------------------------------------------------------------
 *		choose_next_subplan_for_worker
 *
 *		Choose next subplan for a parallel-aware Append, returning
 *		false if there are no more.
 *
 *		We start from the first plan and advance through the list;
 *		when we get back to the end, we loop back to the first
 *		partial plan.  This assigns the non-partial plans first in
 *		order of descending cost and then spreads out the workers
 *		as evenly as possible across the remaining partial plans.
 * ----------------------------------------------------------------
 */
static bool
choose_next_subplan_for_worker(AppendState *node)
{
	ParallelAppendState *pstate = node->as_pstate;

	/* Backward scan is not supported by parallel-aware plans */
	Assert(ScanDirectionIsForward(node->ps.state->es_direction));

	/* We should never be called when there are no subplans */
	Assert(node->as_whichplan != NO_MATCHING_SUBPLANS);

	LWLockAcquire(&pstate->pa_lock, LW_EXCLUSIVE);

	/* Mark just-completed subplan as finished. */
	if (node->as_whichplan != INVALID_SUBPLAN_INDEX)
		node->as_pstate->pa_finished[node->as_whichplan] = true;

	/*
	 * If we've yet to determine the valid subplans then do so now.  If
	 * run-time pruning is disabled then the valid subplans will always be set
	 * to all subplans.
	 */
	else if (node->as_valid_subplans == NULL)
	{
		Append	   *plan = (Append *) node->ps.plan;

		node->as_valid_subplans =
			ExecFindMatchingSubPlans(node->as_prune_state,
									 node->ps.state,
									 list_length(plan->appendplans),
									 plan->join_prune_paramids);
		mark_invalid_subplans_as_finished(node);
	}

	/* If all the plans are already done, we have nothing to do */
	if (pstate->pa_next_plan == INVALID_SUBPLAN_INDEX)
	{
		LWLockRelease(&pstate->pa_lock);
		return false;
	}

	/* Save the plan from which we are starting the search. */
	node->as_whichplan = pstate->pa_next_plan;

	/* Loop until we find a valid subplan to execute. */
	while (pstate->pa_finished[pstate->pa_next_plan])
	{
		int			nextplan;

		nextplan = bms_next_member(node->as_valid_subplans,
								   pstate->pa_next_plan);
		if (nextplan >= 0)
		{
			/* Advance to the next valid plan. */
			pstate->pa_next_plan = nextplan;
		}
		else if (node->as_whichplan > node->as_first_partial_plan)
		{
			/*
			 * Try looping back to the first valid partial plan, if there is
			 * one.  If there isn't, arrange to bail out below.
			 */
			nextplan = bms_next_member(node->as_valid_subplans,
									   node->as_first_partial_plan - 1);
			pstate->pa_next_plan =
				nextplan < 0 ? node->as_whichplan : nextplan;
		}
		else
		{
			/*
			 * At last plan, and either there are no partial plans or we've
			 * tried them all.  Arrange to bail out.
			 */
			pstate->pa_next_plan = node->as_whichplan;
		}

		if (pstate->pa_next_plan == node->as_whichplan)
		{
			/* We've tried everything! */
			pstate->pa_next_plan = INVALID_SUBPLAN_INDEX;
			LWLockRelease(&pstate->pa_lock);
			return false;
		}
	}

	/* Pick the plan we found, and advance pa_next_plan one more time. */
	node->as_whichplan = pstate->pa_next_plan;
	pstate->pa_next_plan = bms_next_member(node->as_valid_subplans,
										   pstate->pa_next_plan);

	/*
	 * If there are no more valid plans then try setting the next plan to the
	 * first valid partial plan.
	 */
	if (pstate->pa_next_plan < 0)
	{
		int			nextplan = bms_next_member(node->as_valid_subplans,
											   node->as_first_partial_plan - 1);

		if (nextplan >= 0)
			pstate->pa_next_plan = nextplan;
		else
		{
			/*
			 * There are no valid partial plans, and we already chose the last
			 * non-partial plan; so flag that there's nothing more for our
			 * fellow workers to do.
			 */
			pstate->pa_next_plan = INVALID_SUBPLAN_INDEX;
		}
	}

	/* If non-partial, immediately mark as finished. */
	if (node->as_whichplan < node->as_first_partial_plan)
		node->as_pstate->pa_finished[node->as_whichplan] = true;

	LWLockRelease(&pstate->pa_lock);

	return true;
}

/*
 * mark_invalid_subplans_as_finished
 *		Marks the ParallelAppendState's pa_finished as true for each invalid
 *		subplan.
 *
 * This function should only be called for parallel Append with run-time
 * pruning enabled.
 */
static void
mark_invalid_subplans_as_finished(AppendState *node)
{
	int			i;

	/* Only valid to call this while in parallel Append mode */
	Assert(node->as_pstate);

	/* Shouldn't have been called when run-time pruning is not enabled */
	Assert(node->as_prune_state);

	/* Nothing to do if all plans are valid */
	if (bms_num_members(node->as_valid_subplans) == node->as_nplans)
		return;

	/* Mark all non-valid plans as finished */
	for (i = 0; i < node->as_nplans; i++)
	{
		if (!bms_is_member(i, node->as_valid_subplans))
			node->as_pstate->pa_finished[i] = true;
	}
}

void
ExecSquelchAppend(AppendState *node)
{
	int			i;

	for (i = 0; i < node->as_nplans; i++)
		ExecSquelchNode(node->appendplans[i]);
}

相关信息

greenplumn 源码目录

相关文章

greenplumn execAmi 源码

greenplumn execCurrent 源码

greenplumn execExpr 源码

greenplumn execExprInterp 源码

greenplumn execGrouping 源码

greenplumn execIndexing 源码

greenplumn execJunk 源码

greenplumn execMain 源码

greenplumn execParallel 源码

greenplumn execPartition 源码

0  赞