greenplumn nodeTableFunction 源码

  • 2022-08-18
  • 浏览 (373)

greenplumn nodeTableFunction 代码

文件路径:/src/backend/executor/nodeTableFunction.c

/*-------------------------------------------------------------------------
 *
 * nodeTableFunction.c
 *	 Support routines for scans of enhanced table functions.
 *
 * DESCRIPTION
 *
 *   This code is distinct from ExecFunctionScan due to the nature of
 *   the plans.  A plain table function will be called without an input
 *   subquery, whereas the enhanced table function framework allows 
 *   table functions operating over table input.
 *
 *  Normal Table Function            Enhanced Table Function
 *
 *         (out)                              (out)
 *           |                                  |
 *     (FunctionScan)                  (TableFunctionScan)
 *                                              |
 *                                        (SubqueryScan)
 *
 * INTERFACE ROUTINES
 * 	 ExecTableFunctionScan			sequentially scans a relation.
 *	 ExecTableFunctionNext			retrieve next tuple in sequential order.
 *	 ExecInitTableFunctionScan		creates and initializes a externalscan node.
 *	 ExecEndTableFunctionScan		releases any storage allocated.
 *	 ExecTableFunctionReScan		rescans the relation
 *
 * Portions Copyright (c) 2011, EMC
 * Portions Copyright (c) 2012-Present VMware, Inc. or its affiliates.
 *
 *
 * IDENTIFICATION
 *	    src/backend/executor/nodeTableFunction.c
 *
 *-------------------------------------------------------------------------
 */

#include "postgres.h"
#include "funcapi.h"
#include "tablefuncapi.h"

#include "cdb/cdbvars.h"
#include "executor/executor.h"
#include "executor/nodeTableFunction.h"
#include "nodes/nodeFuncs.h"
#include "parser/parsetree.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"


static TupleTableSlot *TableFunctionNext(TableFunctionState *node);

/* Private structure forward declared in tablefuncapi.h */
typedef struct AnyTableData
{
	ExprContext			*econtext;
	PlanState			*subplan;     /* subplan node */
	TupleDesc            subdesc;     /* tuple descriptor of subplan */
	JunkFilter          *junkfilter;  /* for projection of subplan tuple */
} AnyTableData;

/*
 * TableFunctionNext - ExecScan callback function for table function scans
 */
static TupleTableSlot *
TableFunctionNext(TableFunctionState *node)
{
	MemoryContext        oldcontext  = NULL;
	TupleTableSlot		*slot        = NULL;
	ExprContext			*econtext	 = node->ss.ps.ps_ExprContext;
	bool                 returns_set = node->flinfo.fn_retset;
	HeapTuple            tuple       = NULL;
	TupleDesc            resultdesc  = node->resultdesc;
	Datum                user_result;

	/* Clean up any per-tuple memory */
	ResetExprContext(econtext);

	/* Setup arguments on the first call */
	if (node->is_firstcall)
	{
		int			i = 0;
		ListCell   *lc;

		foreach(lc, node->args)
		{
			ExprState  *argstate = (ExprState *) lfirst(lc);

			if (!argstate)
			{
				/*
				 * The ANYTABLE argument is represented as a NULL in the
				 * arguments list.
				 */
				node->fcinfo->args[i].value = AnyTableGetDatum(node->inputscan);
				node->fcinfo->args[i].isnull = false;
			}
			else
				node->fcinfo->args[i].value = ExecEvalExpr(argstate,
														   econtext,
														   &node->fcinfo->args[i].isnull);
			i++;
		}
		node->is_firstcall = false;
	}

	/*
	 * If all results have been returned by the callback function then
	 * we are done. 
	 */
	if (node->rsinfo.isDone == ExprEndResult)
		return slot;  /* empty slot */

	/* Invoke the user supplied function */
	node->fcinfo->isnull = false;
	oldcontext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
	user_result = FunctionCallInvoke(node->fcinfo);
	MemoryContextSwitchTo(oldcontext);
	if (node->rsinfo.returnMode != SFRM_ValuePerCall)
	{
		/* FIXME: should support both protocols */
		ereport(ERROR,
				(errcode(ERRCODE_E_R_I_E_SRF_PROTOCOL_VIOLATED),
				 errmsg("table functions must use SFRM_ValuePerCall protocol")));
	}
	if (node->rsinfo.isDone == ExprEndResult)
		return slot;  /* empty slot */

	/* Mark this the last value if the func doesn't return a set */
	if (!returns_set || node->rsinfo.isDone == ExprSingleResult)
		node->rsinfo.isDone = ExprEndResult;

	/* This would only error if the user violated the SRF calling convensions */
	if (returns_set && node->fcinfo->isnull)
	{
		ereport(ERROR,
				(errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
				 errmsg("function returning set of rows cannot return null value")));
	}

	/* Convert the Datum into tuple and store it into the scan slot */
	if (node->is_rowtype)
	{
		HeapTupleHeader  th;

		if (node->fcinfo->isnull)
		{
			int			i;
			Datum		values[MaxTupleAttributeNumber];
			bool		nulls[MaxTupleAttributeNumber];

			Assert(!returns_set);  /* checked above */
			Assert(resultdesc->natts <= MaxTupleAttributeNumber);
			for (i = 0; i < resultdesc->natts; i++)
				nulls[i] = true;

			/* 
			 * If we get a clean solution to the tuple allocation below we
			 * can use it here as well.  This is less an issue because there
			 * is only a single tuple in this case, so the overhead of a
			 * single palloc is not a big deal.
			 */
			oldcontext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
			tuple = heap_form_tuple(resultdesc, values, nulls);
			MemoryContextSwitchTo(oldcontext);
		}
		else
		{

			/* Convert returned HeapTupleHeader into a HeapTuple */
			th	  = DatumGetHeapTupleHeader(user_result);
			tuple = &node->tuple;

			ItemPointerSetInvalid(&(tuple->t_self));
			tuple->t_len  = HeapTupleHeaderGetDatumLength(th);
			tuple->t_data = th;

			/* Double check that this tuple is of the expected form */
			if (resultdesc->tdtypeid != HeapTupleHeaderGetTypeId(th))
			{
				ereport(ERROR, 
						(errcode(ERRCODE_E_R_I_E_SRF_PROTOCOL_VIOLATED),
						 errmsg("invalid tuple returned from table function"),
						 errdetail("Returned tuple does not match output "
								   "tuple descriptor.")));
			}
		}
	}
	else
	{
		/* 
		 * TODO: This will allocate memory for each tuple, we should be able 
		 * to get away with fewer pallocs.
		 */
		oldcontext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
		/* &user_result yields a singleton pointer - make sure only one is read */
		Assert(1 == resultdesc->natts);
		tuple = heap_form_tuple(resultdesc, 
								&user_result, 
								&node->fcinfo->isnull);
		MemoryContextSwitchTo(oldcontext);
	}

	/* 
	 * Store the tuple into the scan slot.
	 *
	 * Note: Tuple should be allocated in the per-row memory context, so they
	 * will be freed automatically when the context is freed, we cannot free
	 * them again here.
	 */
	Assert(tuple);
	slot = ExecStoreHeapTuple(tuple, 
							  node->ss.ss_ScanTupleSlot, 
							  false /* shouldFree */);
	Assert(!TupIsNull(slot));

	node->ss.ss_ScanTupleSlot = slot;
	return slot;
}

/*
 * TableFunctionRecheck -- access method routine to recheck a tuple in EvalPlanQual
 */
static bool
TableFunctionRecheck(TableFunctionState *node, TupleTableSlot *slot)
{
	/* nothing to check */
	return true;
}

/*
 * ExecTableFunction - wrapper around TableFunctionNext
 */
static TupleTableSlot *
ExecTableFunction(PlanState *pstate)
{
	TableFunctionState *node = castNode(TableFunctionState, pstate);

	return ExecScan(&node->ss,
					(ExecScanAccessMtd) TableFunctionNext,
					(ExecScanRecheckMtd) TableFunctionRecheck);
}


/*
 * ExecInitTableFunction - Setup the table function executor 
 */
TableFunctionState *
ExecInitTableFunction(TableFunctionScan *node, EState *estate, int eflags)
{
	TableFunctionState	*scanstate;
	PlanState           *subplan;
	Oid					 funcrettype;
	TypeFuncClass		 functypclass;
	RangeTblFunction *rtfunc;
	FuncExpr            *func;
	ExprContext         *econtext;
	TupleDesc            inputdesc  = NULL;
	TupleDesc			 resultdesc = NULL;
	ListCell   *lc;
	List	   *argstates;

	/* Inner plan is not used, outer plan must be present */
	Assert(innerPlan(node) == NULL);
	Assert(outerPlan(node) != NULL);

	/* Forward scan only */
	Assert(!(eflags & (EXEC_FLAG_MARK | EXEC_FLAG_BACKWARD)));

	/*
	 * Create state structure.
	 */
	scanstate = makeNode(TableFunctionState);
	scanstate->ss.ps.plan  = (Plan *)node;
	scanstate->ss.ps.state = estate;
	scanstate->ss.ps.ExecProcNode = ExecTableFunction;
	scanstate->inputscan   = palloc0(sizeof(AnyTableData));
	scanstate->is_firstcall = true;

	/* Create expression context for the node. */
	ExecAssignExprContext(estate, &scanstate->ss.ps);
	econtext = scanstate->ss.ps.ps_ExprContext;

	/* Initialize child nodes */
	outerPlanState(scanstate) = ExecInitNode(outerPlan(node), estate, eflags);
	subplan   = outerPlanState(scanstate);
	inputdesc = CreateTupleDescCopy(ExecGetResultType(subplan));

	/* 
	 * The funcexpr must be a function call.  This check is to verify that
	 * the planner didn't try to perform constant folding or other inlining
	 * on a function invoked as a table function.
	 */
	rtfunc = node->function;
	if (!rtfunc->funcexpr || !IsA(rtfunc->funcexpr, FuncExpr))
	{
		/* should not be possible */
		elog(ERROR, "table function expression is not a function expression");
	}
	func = (FuncExpr *) rtfunc->funcexpr;
	functypclass = get_expr_result_type((Node*) func, &funcrettype, &resultdesc);
	
	switch (functypclass)
	{
		case TYPEFUNC_COMPOSITE:
		{
			/* Composite data type: copy the typcache entry for safety */
			Assert(resultdesc);
			resultdesc = CreateTupleDescCopy(resultdesc);
			scanstate->is_rowtype = true;
			break;
		}

		case TYPEFUNC_RECORD:
		{
			/* Record data type: Construct tuple desc based on rangeTable */
			resultdesc = BuildDescFromLists(rtfunc->funccolnames,
											rtfunc->funccoltypes,
											rtfunc->funccoltypmods,
											rtfunc->funccolcollations);
			scanstate->is_rowtype = true;
			break;
		}

		case TYPEFUNC_SCALAR:
		{
			/* Scalar data type: Construct a tuple descriptor manually */
			char	   *attname;

			if (rtfunc->funccolnames)
				attname = strVal(linitial(rtfunc->funccolnames));
			else
				attname = NULL;

			resultdesc = CreateTemplateTupleDesc(1);
			TupleDescInitEntry(resultdesc,
							   (AttrNumber) 1,
							   attname,
							   funcrettype,
							   -1,
							   0);
			TupleDescInitEntryCollation(resultdesc,
										(AttrNumber) 1,
										exprCollation(rtfunc->funcexpr));
			scanstate->is_rowtype = false;
			break;
		}

		default:
		{
			/* This should not be possible, it should be caught by parser. */
			elog(ERROR, "table function has unsupported return type");
		}
	}

	/*
	 * For RECORD results, make sure a typmod has been assigned.  (The
	 * function should do this for itself, but let's cover things in case it
	 * doesn't.)
	 */
	BlessTupleDesc(resultdesc);

	/* Initialize scan slot and type */
	ExecInitScanTupleSlot(estate, &scanstate->ss, resultdesc,
						  &TTSOpsHeapTuple);
	scanstate->resultdesc = resultdesc;

	/* Initialize result tuple type and projection info */
	ExecInitResultTypeTL(&scanstate->ss.ps);
	ExecAssignScanProjectionInfo(&scanstate->ss);

	/* Initialize child expressions */
	scanstate->ss.ps.qual =
		ExecInitQual(node->scan.plan.qual, (PlanState *) scanstate);

	/*
	 * Other node-specific setup
	 */
	scanstate->rsinfo.type		   = T_ReturnSetInfo;
	scanstate->rsinfo.econtext	   = econtext;
	scanstate->rsinfo.expectedDesc = resultdesc;
	scanstate->rsinfo.allowedModes = (int) (SFRM_ValuePerCall);
	scanstate->rsinfo.returnMode   = (int) (SFRM_ValuePerCall);
	scanstate->rsinfo.isDone	   = ExprSingleResult;
	scanstate->rsinfo.setResult    = NULL;
	scanstate->rsinfo.setDesc	   = NULL;

	scanstate->userdata = rtfunc->funcuserdata;

	/* setup the AnyTable input */
	scanstate->inputscan->econtext = econtext;
	scanstate->inputscan->subplan  = subplan;
	scanstate->inputscan->subdesc  = inputdesc;

	/* Determine projection information for subplan */
	scanstate->inputscan->junkfilter =
		ExecInitJunkFilter(subplan->plan->targetlist, 
						   NULL  /* slot */);
	BlessTupleDesc(scanstate->inputscan->junkfilter->jf_cleanTupType);

	/*
	 * Prepare for execution of the function.
	 */
	fmgr_info(func->funcid, &scanstate->flinfo);
	if (scanstate->flinfo.fn_nargs != list_length(func->args))
		elog(ERROR, "number of arguments between TableFunctionScan and Fmgrinfo don't match");
	scanstate->fcinfo = palloc0(SizeForFunctionCallInfo(scanstate->flinfo.fn_nargs));

	InitFunctionCallInfoData(*scanstate->fcinfo,		/* Fcinfo  */
							 &scanstate->flinfo,		/* Flinfo  */
							 list_length(func->args), /* Nargs   */
							 InvalidOid,					  /* input_collation */
							 (Node *) scanstate,                   /* Context */
							 (Node *) &(scanstate->rsinfo));       /* ResultInfo */

	/*
	 * Initialize ExprStates for all arguments.
	 */
	int			count = 0;
	argstates = NIL;
	foreach(lc, func->args)
	{
		Expr	   *arg = (Expr *) lfirst(lc);
		ExprState  *argstate;

		if (IsA(arg, TableValueExpr))
		{
			count++;
			argstate = NULL;
		}
		else
		{
			argstate = ExecInitExpr(arg, &scanstate->ss.ps);
		}
		argstates = lappend(argstates, argstate);
	}

	/*
	 * Currently we don't allow table functions with more than one table value
	 * expression arguments, and if they don't have at least one they will be
	 * planned as nodeFunctionScan instead of nodeTableFunctionScan.
	 * Therefore we should have found exactly 1 TableValueExpr above.
	 */
	if (count != 1)
		elog(ERROR, "table functions over multiple TABLE value expressions not yet supported");

	scanstate->args = argstates;

	return scanstate;
}

void
ExecEndTableFunction(TableFunctionState *node)
{
	/* Free the ExprContext */
	ExecFreeExprContext(&node->ss.ps);
	
	/* Clean out the tuple table */
	if (node->ss.ps.ps_ResultTupleSlot)
		ExecClearTuple(node->ss.ps.ps_ResultTupleSlot);
	ExecClearTuple(node->ss.ss_ScanTupleSlot);

	/* End the subplans */
	ExecEndNode(outerPlanState(node));
}

void
ExecReScanTableFunction(TableFunctionState *node)
{
	/* TableFunction Planner marks TableFunction nodes as not rescannable */
	if (!node->is_firstcall)
		elog(ERROR, "invalid rescan of TableFunctionScan");
}


/* Callback functions exposed to the user */
TupleDesc 
AnyTable_GetTupleDesc(AnyTable t)
{
	if (t == NULL)
	{
		ereport(ERROR, 
				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
				 errmsg("invalid null value for anytable type")));
	}
	Assert(t->junkfilter && IsA(t->junkfilter, JunkFilter));

	/* Return the projected tuple descriptor */
	return t->junkfilter->jf_cleanTupType;
}

HeapTuple
AnyTable_GetNextTuple(AnyTable t)
{
	MemoryContext oldcontext;
	TupleTableSlot *slot;

	if (t == NULL)
	{
		ereport(ERROR, 
				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
				 errmsg("invalid null value for anytable type")));
	}

	/* Fetch the next tuple into the tuple slot */
	oldcontext = MemoryContextSwitchTo(t->econtext->ecxt_per_query_memory);
	t->econtext->ecxt_outertuple = ExecProcNode(t->subplan);
	MemoryContextSwitchTo(oldcontext);
	if (TupIsNull(t->econtext->ecxt_outertuple))
	{
		return (HeapTuple) NULL;
	}

	/* ----------------------------------------
	 * 1) Fetch the tuple from the tuple slot
	 * 2) apply resjunk filtering
	 * 3) copy result into a HeapTuple
	 * ----------------------------------------
	 */

	/* GPDB_91_MERGE_FIXME: We used to call  ExecRemoveJunk here, but it
	 * was removed in the upstream. I copied the implementation of
	 * ExecRemoveJunk here, but based on the commit message (2e852e541c),
	 * I don't think we should be doing this either
	 */
	slot = ExecFilterJunk(t->junkfilter, t->econtext->ecxt_outertuple);
	return ExecCopySlotHeapTuple(slot);
}

/*
 * tf_set_userdata_internal
 * This is the entity of TF_SET_USERDATA() API. Sets bytea datum to
 * RangeTblEntry, which is transported to project function via serialized
 * plan tree.
 */
void
tf_set_userdata_internal(FunctionCallInfo fcinfo, bytea *userdata)
{
	RangeTblFunction *rtfunc;

	if (!fcinfo->context || !IsA(fcinfo->context, RangeTblFunction))
		ereport(ERROR,
				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
				 errmsg("expected RangeTblFunction node, found %d",
				 fcinfo->context ? nodeTag(fcinfo->context) : 0)));
	rtfunc = (RangeTblFunction *) fcinfo->context;

	/* Make sure it gets detoasted, but packed is allowed */
	rtfunc->funcuserdata =
		userdata ? pg_detoast_datum_packed(userdata) : NULL;
}

/*
 * tf_get_userdata_internal
 * This is the entity of TF_GET_USERDATA() API. Extracts userdata from
 * its scan node which was transported via serialized plan tree.
 */
bytea *
tf_get_userdata_internal(FunctionCallInfo fcinfo)
{
	bytea	   *userdata;

	if (!fcinfo->context || !IsA(fcinfo->context, TableFunctionState))
		ereport(ERROR,
				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
				 errmsg("expected TableFunctionState node, found %d",
				 fcinfo->context ? nodeTag(fcinfo->context) : 0)));

	userdata = ((TableFunctionState *) fcinfo->context)->userdata;
	if (!userdata)
		return NULL;

	/* unpack, just in case */
	return pg_detoast_datum(userdata);
}

相关信息

greenplumn 源码目录

相关文章

greenplumn execAmi 源码

greenplumn execCurrent 源码

greenplumn execExpr 源码

greenplumn execExprInterp 源码

greenplumn execGrouping 源码

greenplumn execIndexing 源码

greenplumn execJunk 源码

greenplumn execMain 源码

greenplumn execParallel 源码

greenplumn execPartition 源码

0  赞