greenplumn nodeSplitUpdate 源码

  • 2022-08-18
  • 浏览 (378)

greenplumn nodeSplitUpdate 代码

文件路径:/src/backend/executor/nodeSplitUpdate.c

/*-------------------------------------------------------------------------
 *
 * nodeSplitUpdate.c
 *	  Implementation of nodeSplitUpdate.
 *
 * Portions Copyright (c) 2012, EMC Corp.
 * Portions Copyright (c) 2012-Present VMware, Inc. or its affiliates.
 *
 *
 * IDENTIFICATION
 *	    src/backend/executor/nodeSplitUpdate.c
 *
 *-------------------------------------------------------------------------
 */

#include "postgres.h"
#include "miscadmin.h"

#include "cdb/cdbhash.h"
#include "cdb/cdbutil.h"
#include "commands/tablecmds.h"
#include "executor/instrument.h"
#include "executor/nodeSplitUpdate.h"

#include "utils/memutils.h"

/* Splits an update tuple into a DELETE/INSERT tuples. */
static void SplitTupleTableSlot(TupleTableSlot *slot,
								List *targetList, SplitUpdate *plannode, SplitUpdateState *node,
								Datum *values, bool *nulls);

/* Memory used by node */
#define SPLITUPDATE_MEM 1


/*
 * Evaluate the hash keys, and compute the target segment ID for the new row.
 */
static uint32
evalHashKey(SplitUpdateState *node, Datum *values, bool *isnulls)
{
	SplitUpdate *plannode = (SplitUpdate *) node->ps.plan;
	ExprContext *econtext = node->ps.ps_ExprContext;
	MemoryContext oldContext;
	unsigned int target_seg;
	CdbHash	   *h = node->cdbhash;

	ResetExprContext(econtext);

	oldContext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);

	cdbhashinit(h);

	for (int i = 0; i < plannode->numHashAttrs; i++)
	{
		AttrNumber	keyattno = plannode->hashAttnos[i];

		/*
		 * Compute the hash function
		 */
		cdbhash(h, i + 1, values[keyattno - 1], isnulls[keyattno - 1]);
	}
	target_seg = cdbhashreduce(h);

	MemoryContextSwitchTo(oldContext);

	return target_seg;
}

/* Split TupleTableSlot into a DELETE and INSERT TupleTableSlot */
static void
SplitTupleTableSlot(TupleTableSlot *slot,
					List *targetList, SplitUpdate *plannode, SplitUpdateState *node,
					Datum *values, bool *nulls)
{
	ListCell *element;
	ListCell *deleteAtt = list_head(plannode->deleteColIdx);
	ListCell *insertAtt = list_head(plannode->insertColIdx);

	slot_getallattrs(slot);
	Datum	   *delete_values = node->deleteTuple->tts_values;
	bool	   *delete_nulls = node->deleteTuple->tts_isnull;
	Datum	   *insert_values = node->insertTuple->tts_values;
	bool	   *insert_nulls = node->insertTuple->tts_isnull;

	/* Iterate through new TargetList and match old and new values. The action is also added in this containsTuple. */
	foreach (element, targetList)
	{
		TargetEntry *tle = lfirst(element);
		AttrNumber attno = tle->resno;

		if (IsA(tle->expr, DMLActionExpr))
		{
			/* Set the corresponding action to the new tuples. */
			delete_values[attno - 1] = Int32GetDatum((int)DML_DELETE);
			delete_nulls[attno - 1] = false;

			insert_values[attno - 1] = Int32GetDatum((int)DML_INSERT);
			insert_nulls[attno -1 ] = false;
		}
		else if (attno <= list_length(plannode->insertColIdx))
		{
			/* Old and new values */
			int			deleteAttNo = lfirst_int(deleteAtt);
			int			insertAttNo = lfirst_int(insertAtt);

			if (deleteAttNo == -1)
			{
				delete_values[attno - 1] = (Datum) 0;
				delete_nulls[attno - 1] = true;
			}
			else
			{
				delete_values[attno - 1] = values[deleteAttNo - 1];
				delete_nulls[attno - 1] = nulls[deleteAttNo - 1];
			}

			insert_values[attno - 1] = values[insertAttNo - 1];
			insert_nulls[attno - 1] = nulls[insertAttNo - 1];

			deleteAtt = lnext(deleteAtt);
			insertAtt = lnext(insertAtt);
		}
		else if (attno == node->output_segid_attno)
		{
			Assert(!nulls[node->input_segid_attno - 1]);

			delete_values[attno - 1] = values[node->input_segid_attno - 1];
			delete_nulls[attno - 1] = false;

			/* compute the new value later, after we have processed all the other columns */
		}
		else
		{
			if (IsA(tle->expr, Var))
			{
				Var		   *var = (Var *) tle->expr;

				Assert(var->varno == OUTER_VAR);

				delete_values[attno - 1] = values[var->varattno - 1];
				delete_nulls[attno - 1] = nulls[var->varattno - 1];

				insert_values[attno - 1] = values[var->varattno - 1];
				insert_nulls[attno - 1] = nulls[var->varattno - 1];

				Assert(var->vartype == TupleDescAttr(slot->tts_tupleDescriptor, var->varattno - 1)->atttypid);
			}
			/* `Resjunk' values */
		}
	}

	/* Compute segment ID for the new row */
	if (node->output_segid_attno > 0)
	{
		int32		target_seg;

		target_seg = evalHashKey(node, insert_values, insert_nulls);

		insert_values[node->output_segid_attno - 1] = Int32GetDatum(target_seg);
		insert_nulls[node->output_segid_attno - 1] = false;
	}
}

/**
 * Splits every TupleTableSlot into two TupleTableSlots: DELETE and INSERT.
 */
static TupleTableSlot *
ExecSplitUpdate(PlanState *pstate)
{
	SplitUpdateState *node = castNode(SplitUpdateState, pstate);
	PlanState *outerNode = outerPlanState(node);
	SplitUpdate *plannode = (SplitUpdate *) node->ps.plan;

	TupleTableSlot *slot = NULL;
	TupleTableSlot *result = NULL;

	Assert(outerNode != NULL);

	/* Returns INSERT TupleTableSlot. */
	if (!node->processInsert)
	{
		result = node->insertTuple;

		node->processInsert = true;
	}
	else
	{
		/* Creates both TupleTableSlots. Returns DELETE TupleTableSlots.*/
		slot = ExecProcNode(outerNode);

		if (TupIsNull(slot))
		{
			return NULL;
		}

		/* `Split' update into delete and insert */
		slot_getallattrs(slot);
		Datum	   *values = slot->tts_values;
		bool	   *nulls = slot->tts_isnull;

		ExecStoreAllNullTuple(node->deleteTuple);
		ExecStoreAllNullTuple(node->insertTuple);

		SplitTupleTableSlot(slot, plannode->plan.targetlist, plannode, node, values, nulls);

		result = node->deleteTuple;
		node->processInsert = false;

	}

	return result;
}

/*
 * Init SplitUpdate Node. A memory context is created to hold Split Tuples.
 * */
SplitUpdateState*
ExecInitSplitUpdate(SplitUpdate *node, EState *estate, int eflags)
{
	SplitUpdateState *splitupdatestate;

	/* Check for unsupported flags */
	Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK | EXEC_FLAG_REWIND)));

	splitupdatestate = makeNode(SplitUpdateState);
	splitupdatestate->ps.plan = (Plan *)node;
	splitupdatestate->ps.state = estate;
	splitupdatestate->ps.ExecProcNode = ExecSplitUpdate;
	splitupdatestate->processInsert = true;

	/*
	 * then initialize outer plan
	 */
	Plan *outerPlan = outerPlan(node);
	outerPlanState(splitupdatestate) = ExecInitNode(outerPlan, estate, eflags);

	ExecAssignExprContext(estate, &splitupdatestate->ps);

	/*
	 * New TupleDescriptor for output TupleTableSlots (old_values + new_values, ctid,
	 * gp_segment, action).
	 */
	TupleDesc tupDesc = ExecTypeFromTL(node->plan.targetlist);
	splitupdatestate->insertTuple = ExecInitExtraTupleSlot(estate, tupDesc, &TTSOpsVirtual);
	splitupdatestate->deleteTuple = ExecInitExtraTupleSlot(estate, tupDesc, &TTSOpsVirtual);

	/*
	 * Look up the positions of the gp_segment_id in the subplan's target
	 * list, and in the result.
	 */
	splitupdatestate->input_segid_attno =
		ExecFindJunkAttributeInTlist(outerPlan->targetlist, "gp_segment_id");
	splitupdatestate->output_segid_attno =
		ExecFindJunkAttributeInTlist(node->plan.targetlist, "gp_segment_id");

	/*
	 * DML nodes do not project.
	 */
	ExecInitResultTupleSlotTL(&splitupdatestate->ps, &TTSOpsVirtual);
	splitupdatestate->ps.ps_ProjInfo = NULL;

	/*
	 * Initialize for computing hash key
	 */
	if (node->numHashAttrs > 0)
	{
		splitupdatestate->cdbhash = makeCdbHash(node->numHashSegments,
												node->numHashAttrs,
												node->hashFuncs);
	}

	if (estate->es_instrument && (estate->es_instrument & INSTRUMENT_CDB))
	{
		splitupdatestate->ps.cdbexplainbuf = makeStringInfo();
	}

	return splitupdatestate;
}

/* Release Resources Requested by SplitUpdate node. */
void
ExecEndSplitUpdate(SplitUpdateState *node)
{
	ExecFreeExprContext(&node->ps);
	ExecClearTuple(node->ps.ps_ResultTupleSlot);
	ExecClearTuple(node->insertTuple);
	ExecClearTuple(node->deleteTuple);
	ExecEndNode(outerPlanState(node));
}

相关信息

greenplumn 源码目录

相关文章

greenplumn execAmi 源码

greenplumn execCurrent 源码

greenplumn execExpr 源码

greenplumn execExprInterp 源码

greenplumn execGrouping 源码

greenplumn execIndexing 源码

greenplumn execJunk 源码

greenplumn execMain 源码

greenplumn execParallel 源码

greenplumn execPartition 源码

0  赞