greenplumn nodeMergeAppend 源码
greenplumn nodeMergeAppend 代码
文件路径:/src/backend/executor/nodeMergeAppend.c
/*-------------------------------------------------------------------------
*
* nodeMergeAppend.c
* routines to handle MergeAppend nodes.
*
* Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*
* IDENTIFICATION
* src/backend/executor/nodeMergeAppend.c
*
*-------------------------------------------------------------------------
*/
/* INTERFACE ROUTINES
* ExecInitMergeAppend - initialize the MergeAppend node
* ExecMergeAppend - retrieve the next tuple from the node
* ExecEndMergeAppend - shut down the MergeAppend node
* ExecReScanMergeAppend - rescan the MergeAppend node
*
* NOTES
* A MergeAppend node contains a list of one or more subplans.
* These are each expected to deliver tuples that are sorted according
* to a common sort key. The MergeAppend node merges these streams
* to produce output sorted the same way.
*
* MergeAppend nodes don't make use of their left and right
* subtrees, rather they maintain a list of subplans so
* a typical MergeAppend node looks like this in the plan tree:
*
* ...
* /
* MergeAppend---+------+------+--- nil
* / \ | | |
* nil nil ... ... ...
* subplans
*/
#include "postgres.h"
#include "executor/execdebug.h"
#include "executor/execPartition.h"
#include "executor/nodeMergeAppend.h"
#include "lib/binaryheap.h"
#include "miscadmin.h"
/*
* We have one slot for each item in the heap array. We use SlotNumber
* to store slot indexes. This doesn't actually provide any formal
* type-safety, but it makes the code more self-documenting.
*/
typedef int32 SlotNumber;
static TupleTableSlot *ExecMergeAppend(PlanState *pstate);
static int heap_compare_slots(Datum a, Datum b, void *arg);
/* ----------------------------------------------------------------
* ExecInitMergeAppend
*
* Begin all of the subscans of the MergeAppend node.
* ----------------------------------------------------------------
*/
MergeAppendState *
ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags)
{
MergeAppendState *mergestate = makeNode(MergeAppendState);
PlanState **mergeplanstates;
Bitmapset *validsubplans;
int nplans;
int i,
j;
ListCell *lc;
/* check for unsupported flags */
Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
/*
* create new MergeAppendState for our node
*/
mergestate->ps.plan = (Plan *) node;
mergestate->ps.state = estate;
mergestate->ps.ExecProcNode = ExecMergeAppend;
mergestate->ms_noopscan = false;
/* If run-time partition pruning is enabled, then set that up now */
if (node->part_prune_info != NULL)
{
PartitionPruneState *prunestate;
/* We may need an expression context to evaluate partition exprs */
ExecAssignExprContext(estate, &mergestate->ps);
prunestate = ExecCreatePartitionPruneState(&mergestate->ps,
node->part_prune_info);
mergestate->ms_prune_state = prunestate;
/* Perform an initial partition prune, if required. */
if (prunestate->do_initial_prune)
{
/* Determine which subplans survive initial pruning */
validsubplans = ExecFindInitialMatchingSubPlans(prunestate,
list_length(node->mergeplans));
/*
* The case where no subplans survive pruning must be handled
* specially. The problem here is that code in explain.c requires
* a MergeAppend to have at least one subplan in order for it to
* properly determine the Vars in that subplan's targetlist. We
* sidestep this issue by just initializing the first subplan and
* setting ms_noopscan to true to indicate that we don't really
* need to scan any subnodes.
*/
if (bms_is_empty(validsubplans))
{
mergestate->ms_noopscan = true;
/* Mark the first as valid so that it's initialized below */
validsubplans = bms_make_singleton(0);
}
nplans = bms_num_members(validsubplans);
}
else
{
/* We'll need to initialize all subplans */
nplans = list_length(node->mergeplans);
Assert(nplans > 0);
validsubplans = bms_add_range(NULL, 0, nplans - 1);
}
/*
* If no runtime pruning is required, we can fill ms_valid_subplans
* immediately, preventing later calls to ExecFindMatchingSubPlans.
*/
if (!prunestate->do_exec_prune)
{
Assert(nplans > 0);
mergestate->ms_valid_subplans = bms_add_range(NULL, 0, nplans - 1);
}
}
else
{
nplans = list_length(node->mergeplans);
/*
* When run-time partition pruning is not enabled we can just mark all
* subplans as valid; they must also all be initialized.
*/
Assert(nplans > 0);
mergestate->ms_valid_subplans = validsubplans =
bms_add_range(NULL, 0, nplans - 1);
mergestate->ms_prune_state = NULL;
if (node->join_prune_paramids)
mergestate->ms_valid_subplans = NULL;
}
mergeplanstates = (PlanState **) palloc(nplans * sizeof(PlanState *));
mergestate->mergeplans = mergeplanstates;
mergestate->ms_nplans = nplans;
mergestate->ms_slots = (TupleTableSlot **) palloc0(sizeof(TupleTableSlot *) * nplans);
mergestate->ms_heap = binaryheap_allocate(nplans, heap_compare_slots,
mergestate);
/*
* Miscellaneous initialization
*
* MergeAppend nodes do have Result slots, which hold pointers to tuples,
* so we have to initialize them. FIXME
*/
ExecInitResultTupleSlotTL(&mergestate->ps, &TTSOpsVirtual);
/* node returns slots from each of its subnodes, therefore not fixed */
mergestate->ps.resultopsset = true;
mergestate->ps.resultopsfixed = false;
/*
* call ExecInitNode on each of the valid plans to be executed and save
* the results into the mergeplanstates array.
*/
j = i = 0;
foreach(lc, node->mergeplans)
{
if (bms_is_member(i, validsubplans))
{
Plan *initNode = (Plan *) lfirst(lc);
mergeplanstates[j++] = ExecInitNode(initNode, estate, eflags);
}
i++;
}
mergestate->ps.ps_ProjInfo = NULL;
/*
* initialize sort-key information
*/
mergestate->ms_nkeys = node->numCols;
mergestate->ms_sortkeys = palloc0(sizeof(SortSupportData) * node->numCols);
for (i = 0; i < node->numCols; i++)
{
SortSupport sortKey = mergestate->ms_sortkeys + i;
sortKey->ssup_cxt = CurrentMemoryContext;
sortKey->ssup_collation = node->collations[i];
sortKey->ssup_nulls_first = node->nullsFirst[i];
sortKey->ssup_attno = node->sortColIdx[i];
/*
* It isn't feasible to perform abbreviated key conversion, since
* tuples are pulled into mergestate's binary heap as needed. It
* would likely be counter-productive to convert tuples into an
* abbreviated representation as they're pulled up, so opt out of that
* additional optimization entirely.
*/
sortKey->abbreviate = false;
PrepareSortSupportFromOrderingOp(node->sortOperators[i], sortKey);
}
/*
* initialize to show we have not run the subplans yet
*/
mergestate->ms_initialized = false;
return mergestate;
}
/* ----------------------------------------------------------------
* ExecMergeAppend
*
* Handles iteration over multiple subplans.
* ----------------------------------------------------------------
*/
static TupleTableSlot *
ExecMergeAppend(PlanState *pstate)
{
MergeAppendState *node = castNode(MergeAppendState, pstate);
TupleTableSlot *result;
SlotNumber i;
CHECK_FOR_INTERRUPTS();
if (!node->ms_initialized)
{
/* Nothing to do if all subplans were pruned */
if (node->ms_noopscan)
return ExecClearTuple(node->ps.ps_ResultTupleSlot);
/*
* If we've yet to determine the valid subplans then do so now. If
* run-time pruning is disabled then the valid subplans will always be
* set to all subplans.
*/
if (node->ms_valid_subplans == NULL)
{
MergeAppend *plan = (MergeAppend *) node->ps.plan;
node->ms_valid_subplans =
ExecFindMatchingSubPlans(node->ms_prune_state,
node->ps.state,
list_length(plan->mergeplans),
plan->join_prune_paramids);
}
/*
* First time through: pull the first tuple from each valid subplan,
* and set up the heap.
*/
i = -1;
while ((i = bms_next_member(node->ms_valid_subplans, i)) >= 0)
{
node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
if (!TupIsNull(node->ms_slots[i]))
binaryheap_add_unordered(node->ms_heap, Int32GetDatum(i));
}
binaryheap_build(node->ms_heap);
node->ms_initialized = true;
}
else
{
/*
* Otherwise, pull the next tuple from whichever subplan we returned
* from last time, and reinsert the subplan index into the heap,
* because it might now compare differently against the existing
* elements of the heap. (We could perhaps simplify the logic a bit
* by doing this before returning from the prior call, but it's better
* to not pull tuples until necessary.)
*/
i = DatumGetInt32(binaryheap_first(node->ms_heap));
node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
if (!TupIsNull(node->ms_slots[i]))
binaryheap_replace_first(node->ms_heap, Int32GetDatum(i));
else
(void) binaryheap_remove_first(node->ms_heap);
}
if (binaryheap_empty(node->ms_heap))
{
/* All the subplans are exhausted, and so is the heap */
result = ExecClearTuple(node->ps.ps_ResultTupleSlot);
}
else
{
i = DatumGetInt32(binaryheap_first(node->ms_heap));
result = node->ms_slots[i];
}
return result;
}
/*
* Compare the tuples in the two given slots.
*/
static int32
heap_compare_slots(Datum a, Datum b, void *arg)
{
MergeAppendState *node = (MergeAppendState *) arg;
SlotNumber slot1 = DatumGetInt32(a);
SlotNumber slot2 = DatumGetInt32(b);
TupleTableSlot *s1 = node->ms_slots[slot1];
TupleTableSlot *s2 = node->ms_slots[slot2];
int nkey;
Assert(!TupIsNull(s1));
Assert(!TupIsNull(s2));
for (nkey = 0; nkey < node->ms_nkeys; nkey++)
{
SortSupport sortKey = node->ms_sortkeys + nkey;
AttrNumber attno = sortKey->ssup_attno;
Datum datum1,
datum2;
bool isNull1,
isNull2;
int compare;
datum1 = slot_getattr(s1, attno, &isNull1);
datum2 = slot_getattr(s2, attno, &isNull2);
compare = ApplySortComparator(datum1, isNull1,
datum2, isNull2,
sortKey);
if (compare != 0)
{
INVERT_COMPARE_RESULT(compare);
return compare;
}
}
return 0;
}
/* ----------------------------------------------------------------
* ExecEndMergeAppend
*
* Shuts down the subscans of the MergeAppend node.
*
* Returns nothing of interest.
* ----------------------------------------------------------------
*/
void
ExecEndMergeAppend(MergeAppendState *node)
{
PlanState **mergeplans;
int nplans;
int i;
/*
* get information from the node
*/
mergeplans = node->mergeplans;
nplans = node->ms_nplans;
/*
* shut down each of the subscans
*/
for (i = 0; i < nplans; i++)
ExecEndNode(mergeplans[i]);
}
void
ExecReScanMergeAppend(MergeAppendState *node)
{
int i;
/*
* If any PARAM_EXEC Params used in pruning expressions have changed, then
* we'd better unset the valid subplans so that they are reselected for
* the new parameter values.
*/
if (node->ms_prune_state &&
bms_overlap(node->ps.chgParam,
node->ms_prune_state->execparamids))
{
bms_free(node->ms_valid_subplans);
node->ms_valid_subplans = NULL;
}
for (i = 0; i < node->ms_nplans; i++)
{
PlanState *subnode = node->mergeplans[i];
/*
* ExecReScan doesn't know about my subplans, so I have to do
* changed-parameter signaling myself.
*/
if (node->ps.chgParam != NULL)
UpdateChangedParamSet(subnode, node->ps.chgParam);
/*
* If chgParam of subnode is not null then plan will be re-scanned by
* first ExecProcNode.
*/
if (subnode->chgParam == NULL)
ExecReScan(subnode);
}
binaryheap_reset(node->ms_heap);
node->ms_initialized = false;
}
void
ExecSquelchMergeAppend(MergeAppendState *node)
{
int i;
for (i = 0; i < node->ms_nplans; i++)
{
ExecSquelchNode(node->mergeplans[i]);
}
}
相关信息
相关文章
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦