greenplumn nodeMotion 源码
greenplumn nodeMotion 代码
文件路径:/src/backend/executor/nodeMotion.c
/*-------------------------------------------------------------------------
*
* nodeMotion.c
* Routines to handle moving tuples around in Greenplum Database.
*
* Portions Copyright (c) 2005-2008, Greenplum inc
* Portions Copyright (c) 2012-Present VMware, Inc. or its affiliates.
*
*
* IDENTIFICATION
* src/backend/executor/nodeMotion.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/heapam.h"
#include "nodes/execnodes.h" /* Slice, SliceTable */
#include "cdb/cdbmotion.h"
#include "cdb/cdbutil.h"
#include "cdb/cdbvars.h"
#include "cdb/cdbhash.h"
#include "executor/executor.h"
#include "executor/execdebug.h"
#include "executor/execUtils.h"
#include "executor/nodeMotion.h"
#include "lib/binaryheap.h"
#include "utils/tuplesort.h"
#include "miscadmin.h"
#include "utils/memutils.h"
/* #define MEASURE_MOTION_TIME */
#ifdef MEASURE_MOTION_TIME
#include <unistd.h> /* gettimeofday */
#endif
/* #define CDB_MOTION_DEBUG */
#ifdef CDB_MOTION_DEBUG
#include "utils/lsyscache.h" /* getTypeOutputInfo */
#include "lib/stringinfo.h" /* StringInfo */
#endif
/*=========================================================================
* FUNCTIONS PROTOTYPES
*/
static TupleTableSlot *execMotionSender(MotionState *node);
static TupleTableSlot *execMotionUnsortedReceiver(MotionState *node);
static TupleTableSlot *execMotionSortedReceiver(MotionState *node);
static int CdbMergeComparator(Datum lhs, Datum rhs, void *context);
static uint32 evalHashKey(ExprContext *econtext, List *hashkeys, CdbHash *h);
static void doSendEndOfStream(Motion *motion, MotionState *node);
static void doSendTuple(Motion *motion, MotionState *node, TupleTableSlot *outerTupleSlot);
/*=========================================================================
*/
#ifdef CDB_MOTION_DEBUG
static void
formatTuple(StringInfo buf, TupleTableSlot *slot, Oid *outputFunArray)
{
TupleDesc tupdesc = slot->tts_tupleDescriptor;
int i;
for (i = 0; i < tupdesc->natts; i++)
{
bool isnull;
Datum d = slot_getattr(slot, i + 1, &isnull);
if (d && !isnull)
{
char *s = OidOutputFunctionCall(outputFunArray[i], d);
char *name = NameStr(tupdesc->attrs[i].attname);
if (name && *name)
appendStringInfo(buf, " %s=\"%.30s\"", name, s);
else
appendStringInfo(buf, " \"%.30s\"", s);
pfree(s);
}
}
appendStringInfoChar(buf, '\n');
}
#endif
/* ----------------------------------------------------------------
* ExecMotion
* ----------------------------------------------------------------
*/
static TupleTableSlot *
ExecMotion(PlanState *pstate)
{
MotionState *node = castNode(MotionState, pstate);
Motion *motion = (Motion *) node->ps.plan;
/*
* Check for interrupts. Without this we've seen the scenario before that
* it could be quite slow to cancel a query that selects all the tuples
* from a big distributed table because the motion node on QD has no chance
* of checking the cancel signal.
*/
CHECK_FOR_INTERRUPTS();
/* sanity check */
if (node->stopRequested)
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("unexpected internal error"),
errmsg("Already stopped motion node is executed again, data will lost"),
errhint("Likely motion node is incorrectly squelched earlier")));
/*
* at the top here we basically decide: -- SENDER vs. RECEIVER and --
* SORTED vs. UNSORTED
*/
if (node->mstype == MOTIONSTATE_RECV)
{
TupleTableSlot *tuple;
#ifdef MEASURE_MOTION_TIME
struct timeval startTime;
struct timeval stopTime;
gettimeofday(&startTime, NULL);
#endif
if (node->ps.state->active_recv_id >= 0)
{
if (node->ps.state->active_recv_id != motion->motionID)
{
/*
* See motion_sanity_walker() for details on how a deadlock
* may occur.
*/
elog(LOG, "DEADLOCK HAZARD: Updating active_motion_id from %d to %d",
node->ps.state->active_recv_id, motion->motionID);
node->ps.state->active_recv_id = motion->motionID;
}
}
else
node->ps.state->active_recv_id = motion->motionID;
if (motion->sendSorted)
tuple = execMotionSortedReceiver(node);
else
tuple = execMotionUnsortedReceiver(node);
/*
* We tell the upper node as if this was the end of tuple stream if
* query-finish is requested. Unlike other nodes, we skipped this
* check in ExecProc because this node in sender mode should send EoS
* to the receiver side, but the receiver side can simply stop
* processing the stream. The sender side of this stream could still
* be sending more tuples, but this slice will eventually clean up the
* executor and eventually Stop message will be delivered to the
* sender side.
*/
if (QueryFinishPending)
tuple = NULL;
if (tuple == NULL)
node->ps.state->active_recv_id = -1;
#ifdef MEASURE_MOTION_TIME
gettimeofday(&stopTime, NULL);
node->motionTime.tv_sec += stopTime.tv_sec - startTime.tv_sec;
node->motionTime.tv_usec += stopTime.tv_usec - startTime.tv_usec;
while (node->motionTime.tv_usec < 0)
{
node->motionTime.tv_usec += 1000000;
node->motionTime.tv_sec--;
}
while (node->motionTime.tv_usec >= 1000000)
{
node->motionTime.tv_usec -= 1000000;
node->motionTime.tv_sec++;
}
#endif
return tuple;
}
else if (node->mstype == MOTIONSTATE_SEND)
{
return execMotionSender(node);
}
else
{
elog(ERROR, "cannot execute inactive Motion");
return NULL;
}
}
static TupleTableSlot *
execMotionSender(MotionState *node)
{
/* SENDER LOGIC */
TupleTableSlot *outerTupleSlot;
PlanState *outerNode;
Motion *motion = (Motion *) node->ps.plan;
bool done = false;
#ifdef MEASURE_MOTION_TIME
struct timeval time1;
struct timeval time2;
gettimeofday(&time1, NULL);
#endif
AssertState(motion->motionType == MOTIONTYPE_GATHER ||
motion->motionType == MOTIONTYPE_GATHER_SINGLE ||
motion->motionType == MOTIONTYPE_HASH ||
motion->motionType == MOTIONTYPE_BROADCAST ||
(motion->motionType == MOTIONTYPE_EXPLICIT && motion->segidColIdx > 0));
Assert(node->ps.state->interconnect_context);
while (!done)
{
/* grab TupleTableSlot from our child. */
outerNode = outerPlanState(node);
outerTupleSlot = ExecProcNode(outerNode);
#ifdef MEASURE_MOTION_TIME
gettimeofday(&time2, NULL);
node->otherTime.tv_sec += time2.tv_sec - time1.tv_sec;
node->otherTime.tv_usec += time2.tv_usec - time1.tv_usec;
while (node->otherTime.tv_usec < 0)
{
node->otherTime.tv_usec += 1000000;
node->otherTime.tv_sec--;
}
while (node->otherTime.tv_usec >= 1000000)
{
node->otherTime.tv_usec -= 1000000;
node->otherTime.tv_sec++;
}
#endif
if (done || TupIsNull(outerTupleSlot))
{
doSendEndOfStream(motion, node);
done = true;
}
else if (motion->motionType == MOTIONTYPE_GATHER_SINGLE &&
GpIdentity.segindex != (gp_session_id % node->numInputSegs))
{
/*
* For explicit gather motion, receiver gets data from one
* segment only. The others execute the subplan normally, but
* throw away the resulting tuples.
*/
}
else
{
doSendTuple(motion, node, outerTupleSlot);
/* doSendTuple() may have set node->stopRequested as a side-effect */
if (node->stopRequested)
{
elog(gp_workfile_caching_loglevel, "Motion calling Squelch on child node");
/* propagate stop notification to our children */
ExecSquelchNode(outerNode);
done = true;
}
}
#ifdef MEASURE_MOTION_TIME
gettimeofday(&time1, NULL);
node->motionTime.tv_sec += time1.tv_sec - time2.tv_sec;
node->motionTime.tv_usec += time1.tv_usec - time2.tv_usec;
while (node->motionTime.tv_usec < 0)
{
node->motionTime.tv_usec += 1000000;
node->motionTime.tv_sec--;
}
while (node->motionTime.tv_usec >= 1000000)
{
node->motionTime.tv_usec -= 1000000;
node->motionTime.tv_sec++;
}
#endif
}
Assert(node->stopRequested || node->numTuplesFromChild == node->numTuplesToAMS);
/* nothing else to send out, so we return NULL up the tree. */
return NULL;
}
static TupleTableSlot *
execMotionUnsortedReceiver(MotionState *node)
{
/* RECEIVER LOGIC */
TupleTableSlot *slot;
MinimalTuple tuple;
Motion *motion = (Motion *) node->ps.plan;
EState *estate = node->ps.state;
AssertState(motion->motionType == MOTIONTYPE_GATHER ||
motion->motionType == MOTIONTYPE_GATHER_SINGLE ||
motion->motionType == MOTIONTYPE_HASH ||
motion->motionType == MOTIONTYPE_BROADCAST ||
(motion->motionType == MOTIONTYPE_EXPLICIT && motion->segidColIdx > 0));
Assert(node->ps.state->motionlayer_context);
if (node->stopRequested)
{
SendStopMessage(node->ps.state->motionlayer_context,
node->ps.state->interconnect_context,
motion->motionID);
return NULL;
}
if (estate->interconnect_context == NULL)
{
if (!estate->es_interconnect_is_setup && estate->dispatcherState &&
!estate->es_got_eos)
{
/*
* We could only possibly get here in the following scenario:
* 1. We are QD gracefully aborting a transaction.
* 2. We have torn down the interconnect of the current slice.
* 3. Since an error has happened, we no longer need to finish fetching
* all the tuples, hence squelching the executor subtree.
* 4. We are in the process of ExecSquelchShareInputScan(), and the
* Shared Scan has this Motion below it.
*
* NB: if you need to change this, see also execMotionSortedReceiver()
*/
ereport(NOTICE,
(errmsg("An ERROR must have happened. Stopping a Shared Scan.")));
return NULL;
}
else
ereport(ERROR, (errmsg("Interconnect is down unexpectedly.")));
}
tuple = RecvTupleFrom(node->ps.state->motionlayer_context,
node->ps.state->interconnect_context,
motion->motionID, ANY_ROUTE);
if (!tuple)
{
#ifdef CDB_MOTION_DEBUG
if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
elog(DEBUG4, "motionID=%d saw end of stream", motion->motionID);
#endif
Assert(node->numTuplesFromAMS == node->numTuplesToParent);
Assert(node->numTuplesFromChild == 0);
Assert(node->numTuplesToAMS == 0);
return NULL;
}
node->numTuplesFromAMS++;
node->numTuplesToParent++;
/* store it in our result slot and return this. */
slot = node->ps.ps_ResultTupleSlot;
slot = ExecStoreMinimalTuple(tuple, slot, true /* shouldFree */ );
#ifdef CDB_MOTION_DEBUG
if (node->numTuplesToParent <= 20)
{
StringInfoData buf;
initStringInfo(&buf);
appendStringInfo(&buf, " motion%-3d rcv %5d.",
motion->motionID,
node->numTuplesToParent);
formatTuple(&buf, slot, node->outputFunArray);
elog(DEBUG3, "%s", buf.data);
pfree(buf.data);
}
#endif
return slot;
}
/*
* General background on Sorted Motion:
* -----------------------------------
* NOTE: This function is only used for order-preserving motion. There are
* only 2 types of motion that order-preserving makes sense for: FIXED and
* BROADCAST (HASH does not make sense). so we have:
*
* CASE 1: broadcast order-preserving fixed motion. This should only be
* called for SENDERs.
*
* CASE 2: single-destination order-preserving fixed motion. The SENDER
* side will act like Unsorted motion and won't call this. So only
* the RECEIVER should be called for this case.
*
*
* Sorted Receive Notes:
* --------------------
*
* The 1st time we execute, we need to pull a tuple from each of our source
* and store them in our tupleheap. Once that is done, we can pick the lowest
* (or whatever the criterion is) value from amongst all the sources. This
* works since each stream is sorted itself.
*
* We keep track of which one was selected, this will be slot we will need
* to fill during the next call.
*
* Subsequent calls to this function (after the 1st time) will start by
* trying to receive a tuple for the slot that was emptied the previous call.
* Then we again select the lowest value and return that tuple.
*/
/* Sorted receiver using binary heap */
static TupleTableSlot *
execMotionSortedReceiver(MotionState *node)
{
TupleTableSlot *slot;
binaryheap *hp = node->tupleheap;
MinimalTuple inputTuple;
Motion *motion = (Motion *) node->ps.plan;
EState *estate = node->ps.state;
AssertState(motion->motionType == MOTIONTYPE_GATHER &&
motion->sendSorted &&
hp != NULL);
/* Notify senders and return EOS if caller doesn't want any more data. */
if (node->stopRequested)
{
SendStopMessage(node->ps.state->motionlayer_context,
node->ps.state->interconnect_context,
motion->motionID);
return NULL;
}
if (estate->interconnect_context == NULL)
{
if (!estate->es_interconnect_is_setup && estate->dispatcherState &&
!estate->es_got_eos)
{
/*
* We could only possibly get here in the following scenario:
* 1. We are QD gracefully aborting a transaction.
* 2. We have torn down the interconnect of the current slice.
* 3. Since an error has happened, we no longer need to finish fetching
* all the tuples, hence squelching the executor subtree.
* 4. We are in the process of ExecSquelchShareInputScan(), and the
* Shared Scan has this Motion below it.
*
* NB: if you need to change this, see also execMotionUnsortedReceiver()
*/
ereport(NOTICE,
(errmsg("An ERROR must have happened. Stopping a Shared Scan.")));
return NULL;
}
else
ereport(ERROR, (errmsg("Interconnect is down unexpectedly.")));
}
/*
* On first call, fill the priority queue with each sender's first tuple.
*/
if (!node->tupleheapReady)
{
MinimalTuple inputTuple;
binaryheap *hp = node->tupleheap;
Motion *motion = (Motion *) node->ps.plan;
int iSegIdx;
ListCell *lcProcess;
ExecSlice *sendSlice = &node->ps.state->es_sliceTable->slices[motion->motionID];
Assert(sendSlice->sliceIndex == motion->motionID);
foreach_with_count(lcProcess, sendSlice->primaryProcesses, iSegIdx)
{
MemoryContext oldcxt;
if (lfirst(lcProcess) == NULL)
continue; /* skip this one: we are not receiving from it */
inputTuple = RecvTupleFrom(node->ps.state->motionlayer_context,
node->ps.state->interconnect_context,
motion->motionID, iSegIdx);
if (!inputTuple)
continue; /* skip this one: received nothing */
/*
* Make a slot to hold this tuple. We will reuse it to hold any
* future tuples from the same sender. We initialized the result
* tuple slot with the correct type earlier, so make the new slot
* have the same type.
*/
oldcxt = MemoryContextSwitchTo(estate->es_query_cxt);
node->slots[iSegIdx] = MakeTupleTableSlot(node->ps.ps_ResultTupleSlot->tts_tupleDescriptor,
&TTSOpsMinimalTuple);
MemoryContextSwitchTo(oldcxt);
/*
* Store the tuple in the slot, and add it to the heap.
*
* Use slot_getsomeattrs() to materialize the columns we need for
* the comparisons in the tts_values/isnull arrays. The comparator
* can then peek directly into the arrays, which is cheaper than
* calling slot_getattr() all the time.
*/
ExecStoreMinimalTuple(inputTuple, node->slots[iSegIdx], true);
slot_getsomeattrs(node->slots[iSegIdx], node->lastSortColIdx);
binaryheap_add_unordered(hp, iSegIdx);
node->numTuplesFromAMS++;
#ifdef CDB_MOTION_DEBUG
if (node->numTuplesFromAMS <= 20)
{
StringInfoData buf;
initStringInfo(&buf);
appendStringInfo(&buf, " motion%-3d rcv<-%-3d %5d.",
motion->motionID,
iSegIdx,
node->numTuplesFromAMS);
formatTuple(&buf, node->slots[iSegIdx], node->outputFunArray);
elog(DEBUG3, "%s", buf.data);
pfree(buf.data);
}
#endif
}
Assert(iSegIdx == node->numInputSegs);
/*
* Done adding the elements, now arrange the heap to satisfy the heap
* property. This is quicker than inserting the initial elements one by
* one.
*/
binaryheap_build(hp);
node->tupleheapReady = true;
}
/*
* Delete from the priority queue the element that we fetched last time.
* Receive and insert the next tuple from that same sender.
*/
else
{
/* sanity check */
if (binaryheap_empty(hp))
elog(ERROR, "sorted Gather Motion called again after already receiving all data");
/* Old element is still at the head of the pq. */
Assert(DatumGetInt32(binaryheap_first(hp)) == node->routeIdNext);
/* Receive the successor of the tuple that we returned last time. */
inputTuple = RecvTupleFrom(node->ps.state->motionlayer_context,
node->ps.state->interconnect_context,
motion->motionID,
node->routeIdNext);
/* Substitute it in the pq for its predecessor. */
if (inputTuple)
{
ExecStoreMinimalTuple(inputTuple, node->slots[node->routeIdNext], true);
slot_getsomeattrs(node->slots[node->routeIdNext], node->lastSortColIdx);
binaryheap_replace_first(hp, Int32GetDatum(node->routeIdNext));
node->numTuplesFromAMS++;
#ifdef CDB_MOTION_DEBUG
if (node->numTuplesFromAMS <= 20)
{
StringInfoData buf;
initStringInfo(&buf);
appendStringInfo(&buf, " motion%-3d rcv<-%-3d %5d.",
motion->motionID,
node->routeIdNext,
node->numTuplesFromAMS);
formatTuple(&buf, node->slots[node->routeIdNext], node->outputFunArray);
elog(DEBUG3, "%s", buf.data);
pfree(buf.data);
}
#endif
}
else
{
/* At EOS, drop this sender from the priority queue. */
binaryheap_remove_first(hp);
}
}
/* Finished if all senders have returned EOS. */
if (binaryheap_empty(hp))
{
Assert(node->numTuplesFromAMS == node->numTuplesToParent);
Assert(node->numTuplesFromChild == 0);
Assert(node->numTuplesToAMS == 0);
return NULL;
}
/*
* Our next result tuple, with lowest key among all senders, is now at the
* head of the priority queue. Get it from there.
*
* We transfer ownership of the tuple from the pq element to our caller,
* but the pq element itself will remain in place until the next time we
* are called, to avoid an unnecessary rearrangement of the priority
* queue.
*/
node->routeIdNext = binaryheap_first(hp);
slot = node->slots[node->routeIdNext];
/* Update counters. */
node->numTuplesToParent++;
#ifdef CDB_MOTION_DEBUG
if (node->numTuplesToParent <= 20)
{
StringInfoData buf;
initStringInfo(&buf);
appendStringInfo(&buf, " motion%-3d mrg<-%-3d %5d.",
motion->motionID,
node->routeIdNext,
node->numTuplesToParent);
formatTuple(&buf, slot, node->outputFunArray);
elog(DEBUG3, "%s", buf.data);
pfree(buf.data);
}
#endif
/* Return result slot. */
return slot;
} /* execMotionSortedReceiver */
/* ----------------------------------------------------------------
* ExecInitMotion
*
* NOTE: have to be a bit careful, estate->es_cur_slice_idx is not the
* ultimate correct value that it should be on the QE. this happens
* after this call in mppexec.c. This is ok since we don't need it,
* but just be aware before you try and use it here.
* ----------------------------------------------------------------
*/
MotionState *
ExecInitMotion(Motion *node, EState *estate, int eflags)
{
MotionState *motionstate = NULL;
TupleDesc tupDesc;
ExecSlice *sendSlice;
ExecSlice *recvSlice;
SliceTable *sliceTable = estate->es_sliceTable;
PlanState *outerPlan;
int parentIndex;
/*
* If GDD is enabled, the lock of table may downgrade to RowExclusiveLock,
* (see CdbTryOpenRelation function), then EPQ would be triggered, EPQ will
* execute the subplan in the executor, so it will create a new EState,
* but there are no slice tables in the new EState and we can not AssignGangs
* on the QE. In this case, we raise an error.
*/
if (estate->es_epqTupleSlot)
ereport(ERROR,
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
errmsg("EvalPlanQual can not handle subPlan with Motion node")));
Assert(node->motionID > 0);
Assert(node->motionID < sliceTable->numSlices);
AssertImply(node->motionType == MOTIONTYPE_HASH, node->numHashSegments > 0);
parentIndex = estate->currentSliceId;
estate->currentSliceId = node->motionID;
/*
* create state structure
*/
motionstate = makeNode(MotionState);
motionstate->ps.plan = (Plan *) node;
motionstate->ps.state = estate;
motionstate->ps.ExecProcNode = ExecMotion;
motionstate->mstype = MOTIONSTATE_NONE;
motionstate->stopRequested = false;
motionstate->hashExprs = NIL;
motionstate->cdbhash = NULL;
/* Look up the sending and receiving gang's slice table entries. */
sendSlice = &sliceTable->slices[node->motionID];
Assert(sendSlice->sliceIndex == node->motionID);
recvSlice = &sliceTable->slices[parentIndex];
Assert(parentIndex == sendSlice->parentIndex);
/* QD must fill in the global slice table. */
if (Gp_role == GP_ROLE_DISPATCH)
{
MemoryContext oldcxt = MemoryContextSwitchTo(estate->es_query_cxt);
if (node->motionType == MOTIONTYPE_GATHER ||
node->motionType == MOTIONTYPE_GATHER_SINGLE)
{
/* Sending to a single receiving process on the entry db? */
/* Is receiving slice a root slice that runs here in the qDisp? */
if (recvSlice->sliceIndex == recvSlice->rootIndex)
{
motionstate->mstype = MOTIONSTATE_RECV;
/* For parallel retrieve cursor, the motion's gang type could be set as
* GANGTYPE_ENTRYDB_READER explicitly*/
Assert(recvSlice->gangType == GANGTYPE_UNALLOCATED ||
recvSlice->gangType == GANGTYPE_ENTRYDB_READER ||
recvSlice->gangType == GANGTYPE_PRIMARY_WRITER ||
recvSlice->gangType == GANGTYPE_PRIMARY_READER);
}
else
{
/* sanity checks */
if (list_length(recvSlice->segments) != 1)
elog(ERROR, "unexpected gang size: %d", list_length(recvSlice->segments));
}
}
MemoryContextSwitchTo(oldcxt);
}
/* QE must fill in map from motionID to MotionState node. */
else
{
Assert(Gp_role == GP_ROLE_EXECUTE);
if (LocallyExecutingSliceIndex(estate) == recvSlice->sliceIndex)
{
/* this is recv */
motionstate->mstype = MOTIONSTATE_RECV;
}
else if (LocallyExecutingSliceIndex(estate) == sendSlice->sliceIndex)
{
/* this is send */
motionstate->mstype = MOTIONSTATE_SEND;
}
/* TODO: If neither sending nor receiving, don't bother to initialize. */
}
motionstate->tupleheapReady = false;
motionstate->sentEndOfStream = false;
motionstate->otherTime.tv_sec = 0;
motionstate->otherTime.tv_usec = 0;
motionstate->motionTime.tv_sec = 0;
motionstate->motionTime.tv_usec = 0;
motionstate->numTuplesFromChild = 0;
motionstate->numTuplesToAMS = 0;
motionstate->numTuplesFromAMS = 0;
motionstate->numTuplesToParent = 0;
motionstate->stopRequested = false;
motionstate->numInputSegs = list_length(sendSlice->segments);
/*
* Miscellaneous initialization
*
* create expression context for node
*/
ExecAssignExprContext(estate, &motionstate->ps);
/*
* Initializes child nodes. If alien elimination is on, we skip children
* of receiver motion.
*/
if (!estate->eliminateAliens || motionstate->mstype == MOTIONSTATE_SEND)
{
outerPlanState(motionstate) = ExecInitNode(outerPlan(node), estate, eflags);
}
/*
* initialize tuple type. no need to initialize projection info because
* this node doesn't do projections.
*/
outerPlan = outerPlanState(motionstate);
/*
* Initialize result type and slot
*/
ExecInitResultTupleSlotTL(&motionstate->ps, &TTSOpsMinimalTuple);
tupDesc = ExecGetResultType(&motionstate->ps);
motionstate->ps.ps_ProjInfo = NULL;
motionstate->numHashSegments = node->numHashSegments;
/* Set up motion send data structures */
if (motionstate->mstype == MOTIONSTATE_SEND && node->motionType == MOTIONTYPE_HASH)
{
int nkeys;
Assert(node->numHashSegments > 0);
Assert(node->numHashSegments <= recvSlice->planNumSegments);
nkeys = list_length(node->hashExprs);
if (nkeys > 0)
motionstate->hashExprs = ExecInitExprList(node->hashExprs,
(PlanState *) motionstate);
/*
* Create hash API reference
*/
motionstate->cdbhash = makeCdbHash(motionstate->numHashSegments,
nkeys,
node->hashFuncs);
}
/*
* Merge Receive: Set up the key comparator and priority queue.
*
* This is very similar to a Merge Append.
*/
if (node->sendSorted && motionstate->mstype == MOTIONSTATE_RECV)
{
int numInputSegs = motionstate->numInputSegs;
int lastSortColIdx = 0;
/* Allocate array to slots for the next tuple from each sender */
motionstate->slots = palloc0(numInputSegs * sizeof(TupleTableSlot *));
/* Prepare SortSupport data for each column */
motionstate->numSortCols = node->numSortCols;
motionstate->sortKeys = (SortSupport) palloc0(node->numSortCols * sizeof(SortSupportData));
for (int i = 0; i < node->numSortCols; i++)
{
SortSupport sortKey = &motionstate->sortKeys[i];
AssertArg(node->sortColIdx[i] != 0);
AssertArg(node->sortOperators[i] != 0);
sortKey->ssup_cxt = CurrentMemoryContext;
sortKey->ssup_collation = node->collations[i];
sortKey->ssup_nulls_first = node->nullsFirst[i];
sortKey->ssup_attno = node->sortColIdx[i];
PrepareSortSupportFromOrderingOp(node->sortOperators[i], sortKey);
/* Also make note of the last column used in the sort key */
if (node->sortColIdx[i] > lastSortColIdx)
lastSortColIdx = node->sortColIdx[i];
}
motionstate->lastSortColIdx = lastSortColIdx;
motionstate->tupleheap =
binaryheap_allocate(motionstate->numInputSegs,
CdbMergeComparator,
motionstate);
}
/*
* Perform per-node initialization in the motion layer.
*/
UpdateMotionLayerNode(motionstate->ps.state->motionlayer_context,
node->motionID,
node->sendSorted,
tupDesc);
#ifdef CDB_MOTION_DEBUG
motionstate->outputFunArray = (Oid *) palloc(tupDesc->natts * sizeof(Oid));
for (int i = 0; i < tupDesc->natts; i++)
{
bool typisvarlena;
getTypeOutputInfo(tupDesc->attrs[i].atttypid,
&motionstate->outputFunArray[i],
&typisvarlena);
}
#endif
estate->currentSliceId = parentIndex;
return motionstate;
}
/* ----------------------------------------------------------------
* ExecEndMotion(node)
* ----------------------------------------------------------------
*/
void
ExecEndMotion(MotionState *node)
{
Motion *motion = (Motion *) node->ps.plan;
#ifdef MEASURE_MOTION_TIME
double otherTimeSec;
double motionTimeSec;
#endif
ExecFreeExprContext(&node->ps);
/*
* clean out the tuple table
*/
ExecClearTuple(node->ps.ps_ResultTupleSlot);
/*
* Set the slice no for the nodes under this motion.
*/
Assert(node->ps.state != NULL);
/*
* shut down the subplan
*/
ExecEndNode(outerPlanState(node));
#ifdef MEASURE_MOTION_TIME
motionTimeSec = (double) node->motionTime.tv_sec + (double) node->motionTime.tv_usec / 1000000.0;
if (node->mstype == MOTIONSTATE_RECV)
{
elog(DEBUG1,
"Motion Node %d (RECEIVER) Statistics:\n"
"Timing: \n"
"\t Time receiving the tuple: %f sec\n"
"Counters: \n"
"\tnumTuplesFromChild: %d\n"
"\tnumTuplesFromAMS: %d\n"
"\tnumTuplesToAMS: %d\n"
"\tnumTuplesToParent: %d\n",
motNodeID,
motionTimeSec,
node->numTuplesFromChild,
node->numTuplesFromAMS,
node->numTuplesToAMS,
node->numTuplesToParent
);
}
else if (node->mstype == MOTIONSTATE_SEND)
{
otherTimeSec = (double) node->otherTime.tv_sec + (double) node->otherTime.tv_usec / 1000000.0;
elog(DEBUG1,
"Motion Node %d (SENDER) Statistics:\n"
"Timing: \n"
"\t Time getting next tuple to send: %f sec \n"
"\t Time sending the tuple: %f sec\n"
"\t Percentage of time sending: %2.2f%% \n"
"Counters: \n"
"\tnumTuplesFromChild: %d\n"
"\tnumTuplesToAMS: %d\n",
motNodeID,
otherTimeSec,
motionTimeSec,
(double) (motionTimeSec / (otherTimeSec + motionTimeSec)) * 100,
node->numTuplesFromChild,
node->numTuplesToAMS
);
}
#endif /* MEASURE_MOTION_TIME */
/* Merge Receive: Free the priority queue and associated structures. */
if (node->tupleheap != NULL)
{
binaryheap_free(node->tupleheap);
node->tupleheap = NULL;
}
/* Free the slices and routes */
if (node->cdbhash != NULL)
{
pfree(node->cdbhash);
node->cdbhash = NULL;
}
/*
* Free up this motion node's resources in the Motion Layer.
*
* TODO: For now, we don't flush the comm-layer. NO ERRORS DURING AMS!!!
*/
EndMotionLayerNode(node->ps.state->motionlayer_context, motion->motionID,
/* flush-comm-layer */ false);
#ifdef CDB_MOTION_DEBUG
if (node->outputFunArray)
pfree(node->outputFunArray);
#endif
}
/*=========================================================================
* HELPER FUNCTIONS
*/
/*
* CdbMergeComparator:
* Used to compare tuples for a sorted motion node.
*/
static int
CdbMergeComparator(Datum lhs, Datum rhs, void *context)
{
MotionState *node = (MotionState *) context;
int lSegIdx = DatumGetInt32(lhs);
int rSegIdx = DatumGetInt32(rhs);
TupleTableSlot *lslot = node->slots[lSegIdx];
TupleTableSlot *rslot = node->slots[rSegIdx];
SortSupport sortKeys = node->sortKeys;
int nkey;
int compare;
Assert(lslot && rslot);
for (nkey = 0; nkey < node->numSortCols; nkey++)
{
SortSupport ssup = &sortKeys[nkey];
AttrNumber attno = ssup->ssup_attno;
Datum datum1,
datum2;
bool isnull1,
isnull2;
/*
* The caller has called slot_getsomeattrs() to ensure
* that all the columns we need are available directly in
* the values/isnull arrays.
*/
datum1 = lslot->tts_values[attno - 1];
isnull1 = lslot->tts_isnull[attno - 1];
datum2 = rslot->tts_values[attno - 1];
isnull2 = rslot->tts_isnull[attno - 1];
compare = ApplySortComparator(datum1, isnull1,
datum2, isnull2,
ssup);
if (compare != 0)
{
INVERT_COMPARE_RESULT(compare);
return compare;
}
}
return 0;
} /* CdbMergeComparator */
/*
* Experimental code that will be replaced later with new hashing mechanism
*/
uint32
evalHashKey(ExprContext *econtext, List *hashkeys, CdbHash * h)
{
ListCell *hk;
MemoryContext oldContext;
unsigned int target_seg;
ResetExprContext(econtext);
oldContext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
/*
* If we have 1 or more distribution keys for this relation, hash them.
* However, If this happens to be a relation with an empty policy
* (partitioning policy with a NULL distribution key list) then we have no
* hash key value to feed in, so use cdbhashrandomseg() to pick a segment
* at random.
*/
if (list_length(hashkeys) > 0)
{
int i;
cdbhashinit(h);
i = 0;
foreach(hk, hashkeys)
{
ExprState *keyexpr = (ExprState *) lfirst(hk);
Datum keyval;
bool isNull;
/*
* Get the attribute value of the tuple
*/
keyval = ExecEvalExpr(keyexpr, econtext, &isNull);
/*
* Compute the hash function
*/
cdbhash(h, i + 1, keyval, isNull);
i++;
}
target_seg = cdbhashreduce(h);
}
else
{
target_seg = cdbhashrandomseg(h->numsegs);
}
MemoryContextSwitchTo(oldContext);
return target_seg;
}
void
doSendEndOfStream(Motion *motion, MotionState *node)
{
/*
* We have no more child tuples, but we have not successfully sent an
* End-of-Stream token yet.
*/
SendEndOfStream(node->ps.state->motionlayer_context,
node->ps.state->interconnect_context,
motion->motionID);
node->sentEndOfStream = true;
}
/*
* A crufty confusing part of the current code is how contentId is used within
* the motion structures and then how that gets translated to targetRoutes by
* this motion nodes.
*
* WARNING: There are ALOT of assumptions in here about how the motion node
* instructions are encoded into motion and stuff.
*
* There are 3 types of sending that can happen here:
*
* FIXED - sending to a single process. the value in node->fixedSegIdxMask[0]
* is the contentId of who to send to. But we can actually ignore that
* since now with slice tables, we should only have a single CdbProcess
* that we could send to for this motion node.
*
*
* BROADCAST - actually a subcase of FIXED, but handling is simple. send to all
* of our routes.
*
* HASH - maps hash values to segid. this mapping is 1->1 such that a hash
* value of 2 maps to contentid of 2 (for now). Since we can't ever
* use Hash to send to the QD, the QD's contentid of -1 is not an issue.
* Also, the contentid maps directly to the routeid.
*
*/
void
doSendTuple(Motion *motion, MotionState *node, TupleTableSlot *outerTupleSlot)
{
int16 targetRoute;
SendReturnCode sendRC;
ExprContext *econtext = node->ps.ps_ExprContext;
/* We got a tuple from the child-plan. */
node->numTuplesFromChild++;
if (motion->motionType == MOTIONTYPE_GATHER ||
motion->motionType == MOTIONTYPE_GATHER_SINGLE)
{
/*
* Actually, since we can only send to a single output segment
* here, we are guaranteed that we only have a single targetRoute
* setup that we could possibly send to. So we can cheat and just
* fix the targetRoute to 0 (the 1st route).
*/
targetRoute = 0;
}
else if (motion->motionType == MOTIONTYPE_BROADCAST)
{
targetRoute = BROADCAST_SEGIDX;
}
else if (motion->motionType == MOTIONTYPE_HASH) /* Redistribute */
{
uint32 hval = 0;
econtext->ecxt_outertuple = outerTupleSlot;
hval = evalHashKey(econtext, node->hashExprs, node->cdbhash);
#ifdef USE_ASSERT_CHECKING
Assert(hval < node->numHashSegments &&
"redistribute destination outside segment array");
#endif /* USE_ASSERT_CHECKING */
/*
* hashSegIdx takes our uint32 and maps it to an int, and here we
* assign it to an int16. See below.
*/
targetRoute = hval;
/*
* see MPP-2099, let's not run into this one again! NOTE: the
* definition of BROADCAST_SEGIDX is key here, it *cannot* be a valid
* route which our map (above) will *ever* return.
*
* Note the "mapping" is generated at *planning* time in
* makeDefaultSegIdxArray() in cdbmutate.c (it is the trivial map, and
* is passed around our system a fair amount!).
*/
Assert(targetRoute != BROADCAST_SEGIDX);
}
else if (motion->motionType == MOTIONTYPE_EXPLICIT)
{
Datum segidColIdxDatum;
Assert(motion->segidColIdx > 0 && motion->segidColIdx <= list_length((motion->plan).targetlist));
bool is_null = false;
segidColIdxDatum = slot_getattr(outerTupleSlot, motion->segidColIdx, &is_null);
targetRoute = Int32GetDatum(segidColIdxDatum);
Assert(!is_null);
}
else
elog(ERROR, "unknown motion type %d", motion->motionType);
CheckAndSendRecordCache(node->ps.state->motionlayer_context,
node->ps.state->interconnect_context,
motion->motionID,
targetRoute);
/* send the tuple out. */
sendRC = SendTuple(node->ps.state->motionlayer_context,
node->ps.state->interconnect_context,
motion->motionID,
outerTupleSlot,
targetRoute);
Assert(sendRC == SEND_COMPLETE || sendRC == STOP_SENDING);
if (sendRC == SEND_COMPLETE)
node->numTuplesToAMS++;
else
node->stopRequested = true;
#ifdef CDB_MOTION_DEBUG
if (sendRC == SEND_COMPLETE && node->numTuplesToAMS <= 20)
{
StringInfoData buf;
initStringInfo(&buf);
appendStringInfo(&buf, " motion%-3d snd->%-3d, %5d.",
motion->motionID,
targetRoute,
node->numTuplesToAMS);
formatTuple(&buf, outerTupleSlot, node->outputFunArray);
elog(DEBUG3, "%s", buf.data);
pfree(buf.data);
}
#endif
}
/*
* ExecReScanMotion
*
* Motion nodes do not allow rescan after a tuple has been fetched.
*
* When the planner knows that a NestLoop cannot have more than one outer
* tuple, it can omit the usual Materialize operator atop the inner subplan,
* which can lead to invocation of ExecReScanMotion before the motion node's
* first tuple is fetched. Rescan can be implemented as a no-op in this case.
* (After ExecNestLoop fetches an outer tuple, it invokes rescan on the inner
* subplan before fetching the first inner tuple. That doesn't bother us,
* provided there is only one outer tuple.)
*/
void
ExecReScanMotion(MotionState *node)
{
if (node->mstype != MOTIONSTATE_RECV ||
node->numTuplesToParent != 0)
{
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("illegal rescan of motion node: invalid plan"),
errhint("Likely caused by bad NL-join, try setting enable_nestloop to off")));
}
return;
}
/*
* Mark this node as "stopped." When ExecProcNode() is called on a
* stopped motion node it should behave as if there are no tuples
* available.
*
* ExecProcNode() on a stopped motion node should also notify the
* "other end" of the motion node of the stoppage.
*
* Note: once this is called, it is possible that the motion node will
* never be called again, so we *must* send the stop message now.
*/
void
ExecSquelchMotion(MotionState *node)
{
Motion *motion;
AssertArg(node != NULL);
motion = (Motion *) node->ps.plan;
node->stopRequested = true;
node->ps.state->active_recv_id = -1;
/* pass down */
SendStopMessage(node->ps.state->motionlayer_context,
node->ps.state->interconnect_context,
motion->motionID);
}
相关信息
相关文章
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦