greenplumn cdbdisp_query 源码

  • 2022-08-18
  • 浏览 (424)

greenplumn cdbdisp_query 代码

文件路径:/src/backend/cdb/dispatcher/cdbdisp_query.c

/*-------------------------------------------------------------------------
 *
 * cdbdisp_query.c
 *	  Functions to dispatch command string or plan to QExecutors.
 *
 *
 * Portions Copyright (c) 2005-2008, Greenplum inc
 * Portions Copyright (c) 2012-Present VMware, Inc. or its affiliates.
 *
 *
 * IDENTIFICATION
 *	    src/backend/cdb/dispatcher/cdbdisp_query.c
 *
 *-------------------------------------------------------------------------
 */

#include "postgres.h"

#include "access/xact.h"
#include "libpq-fe.h"
#include "libpq-int.h"
#include "cdb/cdbconn.h"
#include "cdb/cdbgang.h"
#include "cdb/cdbutil.h"
#include "cdb/cdbvars.h"
#include "cdb/cdbmutate.h"
#include "cdb/cdbsrlz.h"
#include "cdb/tupleremap.h"
#include "catalog/namespace.h" /* for GetTempNamespaceState() */
#include "nodes/execnodes.h"
#include "pgstat.h"
#include "tcop/tcopprot.h"
#include "utils/datum.h"
#include "utils/guc.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/faultinjector.h"
#include "utils/resgroup.h"
#include "utils/resource_manager.h"
#include "utils/resgroup-ops.h"
#include "utils/session_state.h"
#include "utils/typcache.h"
#include "miscadmin.h"
#include "mb/pg_wchar.h"

#include "cdb/cdbdisp.h"
#include "cdb/cdbdisp_query.h"
#include "cdb/cdbdisp_dtx.h"	/* for qdSerializeDtxContextInfo() */
#include "cdb/cdbdispatchresult.h"
#include "cdb/cdbcopy.h"
#include "executor/execUtils.h"

#define QUERY_STRING_TRUNCATE_SIZE (1024)

extern bool Test_print_direct_dispatch_info;

typedef struct ParamWalkerContext
{
	plan_tree_base_prefix base; /* Required prefix for
								 * plan_tree_walker/mutator */
	List	   *params;
} ParamWalkerContext;

/*
 * We need an array describing the relationship between a slice and
 * the number of "child" slices which depend on it.
 */
typedef struct
{
	int			sliceIndex;
	int			children;
	ExecSlice  *slice;
} SliceVec;

/*
 * Parameter structure for Greenplum Database Queries
 */
typedef struct DispatchCommandQueryParms
{
	/*
	 * The SQL command
	 */
	const char *strCommand;
	int			strCommandlen;
	char	   *serializedPlantree;
	int			serializedPlantreelen;
	char	   *serializedQueryDispatchDesc;
	int			serializedQueryDispatchDesclen;

	/*
	 * Additional information.
	 */
	char	   *serializedOidAssignments;
	int			serializedOidAssignmentslen;

	/*
	 * serialized DTX context string
	 */
	char	   *serializedDtxContextInfo;
	int			serializedDtxContextInfolen;
} DispatchCommandQueryParms;

static int fillSliceVector(SliceTable *sliceTable,
				int sliceIndex,
				SliceVec *sliceVector,
				int len);

static char *buildGpQueryString(DispatchCommandQueryParms *pQueryParms,
				   int *finalLen);

static DispatchCommandQueryParms *cdbdisp_buildPlanQueryParms(struct QueryDesc *queryDesc, bool planRequiresTxn);
static DispatchCommandQueryParms *cdbdisp_buildUtilityQueryParms(struct Node *stmt, int flags, List *oid_assignments);
static DispatchCommandQueryParms *cdbdisp_buildCommandQueryParms(const char *strCommand, int flags);

static void cdbdisp_dispatchCommandInternal(DispatchCommandQueryParms *pQueryParms,
											int flags, List *segments,
											CdbPgResults *cdb_pgresults);

static void
cdbdisp_dispatchX(QueryDesc *queryDesc,
			bool planRequiresTxn,
			bool cancelOnError);

static List *formIdleSegmentIdList(void);

static bool param_walker(Node *node, ParamWalkerContext *context);
static Oid	findParamType(List *params, int paramid);
static Bitmapset *getExecParamsToDispatch(PlannedStmt *stmt, ParamExecData *intPrm,
										  List **paramExecTypes);
static SerializedParams *serializeParamsForDispatch(QueryDesc *queryDesc,
													ParamListInfo externParams,
													ParamExecData *execParams,
													List *paramExecTypes,
													Bitmapset *sendParams);



/*
 * Compose and dispatch the MPPEXEC commands corresponding to a plan tree
 * within a complete parallel plan. (A plan tree will correspond either
 * to an initPlan or to the main plan.)
 *
 * 'execParams', 'paramExecTypes' and 'sendParams' describe executor
 * parameters (PARAM_EXEC) that should be sent with the query.
 * 'sendParams' indicates which parameters are included and 'execParams'
 * contains their values. 'paramExecTypes' is a list indexed by paramid,
 * containing the datatype OID of each parameter.
 * GPDB_11_MERGE_FIXME: In PostgreSQL v11, we have paramExecTypes in
 * PlannedStmt, so it will no longer be necessary to pass it as a param.
 *
 * If cancelOnError is true, then any dispatching error, a cancellation
 * request from the client, or an error from any of the associated QEs,
 * may cause the unfinished portion of the plan to be abandoned or canceled;
 * and in the event this occurs before all gangs have been dispatched, this
 * function does not return, but waits for all QEs to stop and exits to
 * the caller's error catcher via ereport(ERROR,...). Otherwise this
 * function returns normally and errors are not reported until later.
 *
 * If cancelOnError is false, the plan is to be dispatched as fully as
 * possible and the QEs allowed to proceed regardless of cancellation
 * requests, errors or connection failures from other QEs, etc.
 *
 * The CdbDispatchResults objects allocated for the plan are returned
 * in *pPrimaryResults. The caller, after calling
 * CdbCheckDispatchResult(), can examine the CdbDispatchResults
 * objects, can keep them as long as needed, and ultimately must free
 * them with cdbdisp_destroyDispatcherState() prior to deallocation of
 * the caller's memory context. Callers should use PG_TRY/PG_CATCH to
 * ensure proper cleanup.
 *
 * To wait for completion, check for errors, and clean up, it is
 * suggested that the caller use cdbdisp_finishCommand().
 *
 * Note that the slice tree dispatched is the one specified in the EState
 * of the argument QueryDesc as es_cur__slice.
 *
 * Each QE receives its assignment as a message of type 'M' in PostgresMain().
 * The message is deserialized and processed by exec_mpp_query() in postgres.c.
 */
void
CdbDispatchPlan(struct QueryDesc *queryDesc,
				ParamExecData *execParams,
				bool planRequiresTxn,
				bool cancelOnError)
{
	PlannedStmt *stmt;
	bool		is_SRI = false;
	List	   *paramExecTypes;
	Bitmapset  *sendParams;

	Assert(Gp_role == GP_ROLE_DISPATCH);
	Assert(queryDesc != NULL && queryDesc->estate != NULL);

	/*
	 * This function is called only for planned statements.
	 */
	stmt = queryDesc->plannedstmt;
	Assert(stmt);

	/*
	 * Let's evaluate STABLE functions now, so we get consistent values on the
	 * QEs
	 *
	 * Also, if this is a single-row INSERT statement, let's evaluate
	 * nextval() and currval() now, so that we get the QD's values, and a
	 * consistent value for everyone
	 */
	if (queryDesc->operation == CMD_INSERT)
	{
		Assert(stmt->commandType == CMD_INSERT);

		/*
		 * We might look for constant input relation (instead of SRI), but I'm
		 * afraid that wouldn't scale.
		 */
		is_SRI = IsA(stmt->planTree, Result) &&stmt->planTree->lefttree == NULL;
	}

	if (queryDesc->operation == CMD_INSERT ||
		queryDesc->operation == CMD_SELECT ||
		queryDesc->operation == CMD_UPDATE ||
		queryDesc->operation == CMD_DELETE)
	{
		List	   *cursors;

		/*
		 * Need to be careful not to modify the original PlannedStmt, because
		 * it might be a cached plan. So make a copy. A shallow copy of the
		 * fields we don't modify should be enough.
		 */
		stmt = palloc(sizeof(PlannedStmt));
		memcpy(stmt, queryDesc->plannedstmt, sizeof(PlannedStmt));
		stmt->subplans = list_copy(stmt->subplans);

		stmt->planTree = (Plan *) exec_make_plan_constant(stmt, queryDesc->estate, is_SRI, &cursors);
		queryDesc->plannedstmt = stmt;

		queryDesc->ddesc->cursorPositions = (List *) copyObject(cursors);
	}

	/*
	 * Fill in parameter info.
	 *
	 * First, figure out which executor parameters (PARAM_EXEC) have valid
	 * values that need to be included with the query. Then serialize them,
	 * and also any PARAM_EXTERN parameters.
	 */
	sendParams = getExecParamsToDispatch(stmt, execParams, &paramExecTypes);
	queryDesc->ddesc->paramInfo =
		serializeParamsForDispatch(queryDesc,
								   queryDesc->params,
								   execParams, paramExecTypes, sendParams);

	/*
	 * Cursor queries and bind/execute path queries don't run on the
	 * writer-gang QEs; but they require snapshot-synchronization to get
	 * started.
	 *
	 * initPlans, and other work (see the function pre-evaluation above) may
	 * advance the snapshot "segmateSync" value, so we're best off setting the
	 * shared-snapshot-ready value here. This will dispatch to the writer gang
	 * and force it to set its snapshot; we'll then be able to serialize the
	 * same snapshot version (see qdSerializeDtxContextInfo() below).
	 */
	if (queryDesc->extended_query)
	{
		verify_shared_snapshot_ready(gp_command_count);
	}

	/* In the final stage, add the resource information needed for QE by the resource group */
	stmt->total_memory_coordinator = 0;
	stmt->nsegments_coordinator = 0;

	if (IsResGroupEnabled() && gp_resource_group_enable_recalculate_query_mem &&
		memory_spill_ratio != RESGROUP_FALLBACK_MEMORY_SPILL_RATIO)
	{
		/*
		 * We enable resource group re-calculate the query_mem on QE, and we are not in
		 * fall back mode (use statement_mem).
		 */
		stmt->total_memory_coordinator = ResGroupOps_GetTotalMemory();
		stmt->nsegments_coordinator = ResGroupGetHostPrimaryCount();
	}

	cdbdisp_dispatchX(queryDesc, planRequiresTxn, cancelOnError);
}

/*
 * SET command can not be dispatched to named portal (like CURSOR). On the one
 * hand, named portal might be busy and also it should not be affected by
 * the SET command. Then when a dispatcher state of named portal is destroyed,
 * its gang should not be recycled because its guc was not set, so need to mark
 * those gangs as not recyclable.
 */
static void
cdbdisp_markNamedPortalGangsDestroyed(void)
{
	dispatcher_handle_t *head = open_dispatcher_handles;
	while (head != NULL)
	{
		if (head->dispatcherState->isExtendedQuery)
			head->dispatcherState->forceDestroyGang = true;
		head = head->next;
	}
}

/*
 * Special for sending SET commands that change GUC variables, so they go to all
 * gangs, both reader and writer
 *
 * Can not dispatch SET commands to busy reader gangs (allocated by cursors) directly because another
 * command is already in progress.
 * Cursors only allocate reader gangs, so primary writer and idle reader gangs can be dispatched to.
 */
void
CdbDispatchSetCommand(const char *strCommand, bool cancelOnError)
{
	CdbDispatcherState *ds;
	DispatchCommandQueryParms *pQueryParms;
	Gang *primaryGang;
	char	   *queryText;
	int		queryTextLength;
	ListCell   *le;
	ErrorData *qeError = NULL;

	elog((Debug_print_full_dtm ? LOG : DEBUG5),
		 "CdbDispatchSetCommand for command = '%s'",
		 strCommand);

	pQueryParms = cdbdisp_buildCommandQueryParms(strCommand, DF_NONE);

	ds = cdbdisp_makeDispatcherState(false);

	queryText = buildGpQueryString(pQueryParms, &queryTextLength);

	primaryGang = AllocateGang(ds, GANGTYPE_PRIMARY_WRITER, cdbcomponent_getCdbComponentsList());

	/* put all idle segment to a gang so QD can send SET command to them */
	AllocateGang(ds, GANGTYPE_PRIMARY_READER, formIdleSegmentIdList());
	
	cdbdisp_makeDispatchResults(ds, list_length(ds->allocatedGangs), cancelOnError);
	cdbdisp_makeDispatchParams (ds, list_length(ds->allocatedGangs), queryText, queryTextLength);

	foreach(le, ds->allocatedGangs)
	{
		Gang	   *rg = lfirst(le);

		cdbdisp_dispatchToGang(ds, rg, -1);
	}
	addToGxactDtxSegments(primaryGang);

	/*
	 * No need for two-phase commit, so no need to call
	 * addToGxactDtxSegments.
	 */

	cdbdisp_waitDispatchFinish(ds);

	cdbdisp_checkDispatchResult(ds, DISPATCH_WAIT_NONE);

	cdbdisp_getDispatchResults(ds, &qeError);

	/*
	 * For named portal (like CURSOR), SET command will not be
	 * dispatched. Meanwhile such gang should not be reused because
	 * it's guc was not set.
	 */
	cdbdisp_markNamedPortalGangsDestroyed();

	if (qeError)
	{

		FlushErrorState();
		ReThrowError(qeError);
	}

	cdbdisp_destroyDispatcherState(ds);
}

/*
 * CdbDispatchCommand:
 *
 * Execute plain command on all primary writer QEs.
 * If one or more QEs got error, throw a Error.
 *
 * -flags:
 *  Is the combination of DF_NEED_TWO_PHASE, DF_WITH_SNAPSHOT,DF_CANCEL_ON_ERROR
 */
void
CdbDispatchCommand(const char *strCommand,
				   int flags,
				   CdbPgResults *cdb_pgresults)
{
	return CdbDispatchCommandToSegments(strCommand,
										flags,
										cdbcomponent_getCdbComponentsList(),
										cdb_pgresults);
}

/*
 * Like CdbDispatchCommand, but sends the command only to the
 * specified segments.
 */
void
CdbDispatchCommandToSegments(const char *strCommand,
							 int flags,
							 List *segments,
							 CdbPgResults *cdb_pgresults)
{
	DispatchCommandQueryParms *pQueryParms;
	bool needTwoPhase = flags & DF_NEED_TWO_PHASE;

	if (needTwoPhase)
		setupDtxTransaction();

	elogif((Debug_print_full_dtm || log_min_messages <= DEBUG5), LOG,
		   "CdbDispatchCommand: %s (needTwoPhase = %s)",
		   strCommand, (needTwoPhase ? "true" : "false"));

	pQueryParms = cdbdisp_buildCommandQueryParms(strCommand, flags);

	return cdbdisp_dispatchCommandInternal(pQueryParms,
										   flags,
										   segments,
										   cdb_pgresults);
}

/*
 * CdbDispatchUtilityStatement
 *
 * Dispatch an already parsed statement to all primary writer QEs, wait until
 * all QEs finished successfully. If one or more QEs got error,
 * throw an Error.
 *
 * -flags:
 *      Is the combination of DF_NEED_TWO_PHASE, DF_WITH_SNAPSHOT,DF_CANCEL_ON_ERROR
 *
 * -cdb_pgresults:
 *      Indicate whether return the pg_result for each QE connection.
 *
 */
void
CdbDispatchUtilityStatement(struct Node *stmt,
							int flags,
							List *oid_assignments,
							CdbPgResults *cdb_pgresults)
{
	DispatchCommandQueryParms *pQueryParms;
	bool needTwoPhase = flags & DF_NEED_TWO_PHASE;

	if (needTwoPhase)
		setupDtxTransaction();

	elogif((Debug_print_full_dtm || log_min_messages <= DEBUG5), LOG,
		   "CdbDispatchUtilityStatement: %s (needTwoPhase = %s)",
		   (PointerIsValid(debug_query_string) ? debug_query_string : "\"\""),
		   (needTwoPhase ? "true" : "false"));

	pQueryParms = cdbdisp_buildUtilityQueryParms(stmt, flags, oid_assignments);

	return cdbdisp_dispatchCommandInternal(pQueryParms,
										   flags,
										   cdbcomponent_getCdbComponentsList(),
										   cdb_pgresults);
}

static void
cdbdisp_dispatchCommandInternal(DispatchCommandQueryParms *pQueryParms,
                                int flags,
								List *segments,
                                CdbPgResults *cdb_pgresults)
{
	CdbDispatcherState *ds;
	Gang *primaryGang;
	CdbDispatchResults *pr;
	ErrorData *qeError = NULL;
	char *queryText;
	int queryTextLength;

	/*
	 * Dispatch the command.
	 */
	ds = cdbdisp_makeDispatcherState(false);

	/*
	 * Reader gangs use local snapshot to access catalog, as a result, it will
	 * not synchronize with the global snapshot from write gang which will lead
	 * to inconsistent visibilty of catalog table. Considering the case:
	 * 
	 * select * from t, t t1; -- create a reader gang.
	 * begin;
	 * create role r1;
	 * set role r1;  -- set command will also dispatched to idle reader gang
	 *
	 * When set role command dispatched to reader gang, reader gang cannot see
	 * the new tuple t1 in catalog table pg_auth.
	 * To fix this issue, we should drop the idle reader gangs after each
	 * utility statement which may modify the catalog table.
	 */
	ds->destroyIdleReaderGang = true;

	queryText = buildGpQueryString(pQueryParms, &queryTextLength);

	/*
	 * Allocate a primary QE for every available segDB in the system.
	 */
	primaryGang = AllocateGang(ds, GANGTYPE_PRIMARY_WRITER, segments);
	Assert(primaryGang);

	cdbdisp_makeDispatchResults(ds, 1, flags & DF_CANCEL_ON_ERROR);
	cdbdisp_makeDispatchParams (ds, 1, queryText, queryTextLength);

	cdbdisp_dispatchToGang(ds, primaryGang, -1);

	if ((flags & DF_NEED_TWO_PHASE) != 0 || isDtxExplicitBegin())
		addToGxactDtxSegments(primaryGang);

	cdbdisp_waitDispatchFinish(ds);

	cdbdisp_checkDispatchResult(ds, DISPATCH_WAIT_NONE);

	pr = cdbdisp_getDispatchResults(ds, &qeError);

	if (qeError)
	{
		FlushErrorState();
		ReThrowError(qeError);
	}

	/* collect pgstat from QEs for current transaction level */
	pgstat_combine_from_qe(pr, -1);

	cdbdisp_returnResults(pr, cdb_pgresults);

	cdbdisp_destroyDispatcherState(ds);
}

static DispatchCommandQueryParms *
cdbdisp_buildCommandQueryParms(const char *strCommand, int flags)
{
	bool needTwoPhase = flags & DF_NEED_TWO_PHASE;
	bool withSnapshot = flags & DF_WITH_SNAPSHOT;
	DispatchCommandQueryParms *pQueryParms;

	pQueryParms = palloc0(sizeof(*pQueryParms));
	pQueryParms->strCommand = strCommand;
	pQueryParms->serializedQueryDispatchDesc = NULL;
	pQueryParms->serializedQueryDispatchDesclen = 0;

	/*
	 * Serialize a version of our DTX Context Info
	 */
	pQueryParms->serializedDtxContextInfo =
		qdSerializeDtxContextInfo(&pQueryParms->serializedDtxContextInfolen,
								  withSnapshot, false,
								  mppTxnOptions(needTwoPhase),
								  "cdbdisp_dispatchCommandInternal");

	return pQueryParms;
}

static DispatchCommandQueryParms *
cdbdisp_buildUtilityQueryParms(struct Node *stmt,
				int flags,
				List *oid_assignments)
{
	char *serializedPlantree = NULL;
	char *serializedQueryDispatchDesc = NULL;
	int serializedPlantree_len = 0;
	int serializedQueryDispatchDesc_len = 0;
	bool needTwoPhase = flags & DF_NEED_TWO_PHASE;
	bool withSnapshot = flags & DF_WITH_SNAPSHOT;
	QueryDispatchDesc *qddesc;
	PlannedStmt *pstmt;
	DispatchCommandQueryParms *pQueryParms;
	Oid	save_userid;

	Assert(stmt != NULL);
	Assert(stmt->type < 1000);
	Assert(stmt->type > 0);

	/* Wrap it in a PlannedStmt */
	pstmt = makeNode(PlannedStmt);
	pstmt->commandType = CMD_UTILITY;

	/*
	 * We must set q->canSetTag = true.  False would be used to hide a command
	 * introduced by rule expansion which is not allowed to return its
	 * completion status in the command tag (PQcmdStatus/PQcmdTuples). For
	 * example, if the original unexpanded command was SELECT, the status
	 * should come back as "SELECT n" and should not reflect other commands
	 * inserted by rewrite rules.  True means we want the status.
	 */
	pstmt->canSetTag = true;
	pstmt->utilityStmt = stmt;
	pstmt->stmt_location = 0;
	pstmt->stmt_len = 0;

	/*
	 * serialized the stmt tree, and create the sql statement: mppexec ....
	 */
	serializedPlantree = serializeNode((Node *) pstmt, &serializedPlantree_len,
									   NULL /* uncompressed_size */ );
	Assert(serializedPlantree != NULL);

	if (oid_assignments)
	{
		qddesc = makeNode(QueryDispatchDesc);
		qddesc->oidAssignments = oid_assignments;
		GetUserIdAndSecContext(&save_userid, &qddesc->secContext);

		serializedQueryDispatchDesc = serializeNode((Node *) qddesc, &serializedQueryDispatchDesc_len,
													NULL /* uncompressed_size */ );
	}

	pQueryParms = palloc0(sizeof(*pQueryParms));
	pQueryParms->strCommand = PointerIsValid(debug_query_string) ? debug_query_string : "";
	pQueryParms->serializedPlantree = serializedPlantree;
	pQueryParms->serializedPlantreelen = serializedPlantree_len;
	pQueryParms->serializedQueryDispatchDesc = serializedQueryDispatchDesc;
	pQueryParms->serializedQueryDispatchDesclen = serializedQueryDispatchDesc_len;

	/*
	 * Serialize a version of our DTX Context Info
	 */
	pQueryParms->serializedDtxContextInfo =
		qdSerializeDtxContextInfo(&pQueryParms->serializedDtxContextInfolen,
								  withSnapshot, false,
								  mppTxnOptions(needTwoPhase),
								  "cdbdisp_dispatchCommandInternal");

	return pQueryParms;
}

static DispatchCommandQueryParms *
cdbdisp_buildPlanQueryParms(struct QueryDesc *queryDesc,
							bool planRequiresTxn)
{
	char	   *splan,
			   *sddesc;

	int			splan_len,
				splan_len_uncompressed,
				sddesc_len,
				rootIdx;
	Oid			save_userid;

	rootIdx = RootSliceIndex(queryDesc->estate);

	DispatchCommandQueryParms *pQueryParms = (DispatchCommandQueryParms *) palloc0(sizeof(*pQueryParms));

	/*
	 * serialized plan tree. Note that we're called for a single slice tree
	 * (corresponding to an initPlan or the main plan), so the parameters are
	 * fixed and we can include them in the prefix.
	 */
	splan = serializeNode((Node *) queryDesc->plannedstmt, &splan_len, &splan_len_uncompressed);

	uint64		plan_size_in_kb = ((uint64) splan_len_uncompressed) / (uint64) 1024;

	elog(((gp_log_gang >= GPVARS_VERBOSITY_TERSE) ? LOG : DEBUG1),
		 "Query plan size to dispatch: " UINT64_FORMAT "KB", plan_size_in_kb);

	if (0 < gp_max_plan_size && plan_size_in_kb > gp_max_plan_size)
	{
		ereport(ERROR,
				(errcode(ERRCODE_STATEMENT_TOO_COMPLEX),
				 (errmsg("Query plan size limit exceeded, current size: "
						 UINT64_FORMAT "KB, max allowed size: %dKB",
						 plan_size_in_kb, gp_max_plan_size),
				  errhint("Size controlled by gp_max_plan_size"))));
	}

	Assert(splan != NULL && splan_len > 0 && splan_len_uncompressed > 0);

	GetUserIdAndSecContext(&save_userid, &queryDesc->ddesc->secContext);
	sddesc = serializeNode((Node *) queryDesc->ddesc, &sddesc_len, NULL /* uncompressed_size */ );

	pQueryParms->strCommand = queryDesc->sourceText;
	pQueryParms->serializedPlantree = splan;
	pQueryParms->serializedPlantreelen = splan_len;
	pQueryParms->serializedQueryDispatchDesc = sddesc;
	pQueryParms->serializedQueryDispatchDesclen = sddesc_len;

	/*
	 * Serialize a version of our snapshot, and generate our transction
	 * isolations. We generally want Plan based dispatch to be in a global
	 * transaction. The executor gets to decide if the special circumstances
	 * exist which allow us to dispatch without starting a global xact.
	 */
	pQueryParms->serializedDtxContextInfo =
		qdSerializeDtxContextInfo(&pQueryParms->serializedDtxContextInfolen,
								  true /* wantSnapshot */ ,
								  queryDesc->extended_query,
								  mppTxnOptions(planRequiresTxn),
								  "cdbdisp_buildPlanQueryParms");

	return pQueryParms;
}

/*
 * Three Helper functions for cdbdisp_dispatchX:
 *
 * Used to figure out the dispatch order for the sliceTable by
 * counting the number of dependent child slices for each slice; and
 * then sorting based on the count (all indepenedent slices get
 * dispatched first, then the slice above them and so on).
 *
 * fillSliceVector: figure out the number of slices we're dispatching,
 * and order them.
 *
 * count_dependent_children(): walk tree counting up children.
 *
 * compare_slice_order(): comparison function for qsort(): order the
 * slices by the number of dependent children. Empty slices are
 * sorted last (to make this work with initPlans).
 *
 */
static int
compare_slice_order(const void *aa, const void *bb)
{
	SliceVec   *a = (SliceVec *) aa;
	SliceVec   *b = (SliceVec *) bb;

	if (a->slice == NULL)
		return 1;
	if (b->slice == NULL)
		return -1;

	/*
	 * Put the slice not going to dispatch in the last
	 */
	if (a->slice->primaryGang == NULL)
	{
		Assert(a->slice->gangType == GANGTYPE_UNALLOCATED);
		return 1;
	}
	if (b->slice->primaryGang == NULL)
	{
		Assert(b->slice->gangType == GANGTYPE_UNALLOCATED);
		return -1;
	}

	/* sort slice with larger size first because it has a bigger chance to contain writers */
	if (a->slice->primaryGang->size > b->slice->primaryGang->size)
		return -1;

	if (a->slice->primaryGang->size < b->slice->primaryGang->size)
		return 1;

	if (a->children == b->children)
		return 0;
	else if (a->children > b->children)
		return 1;
	else
		return -1;
}

/*
 * Quick and dirty bit mask operations
 */
static void
mark_bit(char *bits, int nth)
{
	int			nthbyte = nth >> 3;
	char		nthbit = 1 << (nth & 7);

	bits[nthbyte] |= nthbit;
}

static void
or_bits(char *dest, char *src, int n)
{
	int			i;

	for (i = 0; i < n; i++)
		dest[i] |= src[i];
}

static int
count_bits(char *bits, int nbyte)
{
	int			i;
	int			nbit = 0;

	int			bitcount[] =
	{
		0, 1, 1, 2, 1, 2, 2, 3, 1, 2, 2, 3, 2, 3, 3, 4
	};

	for (i = 0; i < nbyte; i++)
	{
		nbit += bitcount[bits[i] & 0x0F];
		nbit += bitcount[(bits[i] >> 4) & 0x0F];
	}

	return nbit;
}

/*
 * We use a bitmask to count the dep. childrens.
 * Because of input sharing, the slices now are DAG. We cannot simply go down the
 * tree and add up number of children, which will return too big number.
 */
static int
markbit_dep_children(SliceTable *sliceTable, int sliceIdx,
					 SliceVec *sliceVec, int bitmasklen, char *bits)
{
	ListCell   *sublist;
	ExecSlice  *slice = &sliceTable->slices[sliceIdx];

	foreach(sublist, slice->children)
	{
		int			childIndex = lfirst_int(sublist);
		char	   *newbits = palloc0(bitmasklen);

		markbit_dep_children(sliceTable, childIndex,
							 sliceVec, bitmasklen, newbits);
		or_bits(bits, newbits, bitmasklen);
		mark_bit(bits, childIndex);
		pfree(newbits);
	}

	sliceVec[sliceIdx].sliceIndex = sliceIdx;
	sliceVec[sliceIdx].children = count_bits(bits, bitmasklen);
	sliceVec[sliceIdx].slice = slice;

	return sliceVec[sliceIdx].children;
}

/*
 * Count how many dependent childrens and fill in the sliceVector of dependent childrens.
 */
static int
count_dependent_children(SliceTable *sliceTable, int sliceIndex,
						 SliceVec *sliceVector, int len)
{
	int			ret = 0;
	int			bitmasklen = (len + 7) >> 3;
	char	   *bitmask = palloc0(bitmasklen);

	ret = markbit_dep_children(sliceTable, sliceIndex, sliceVector, bitmasklen, bitmask);
	pfree(bitmask);

	return ret;
}

static int
fillSliceVector(SliceTable *sliceTbl, int rootIdx,
				SliceVec *sliceVector, int nTotalSlices)
{
	int			top_count;

	/*
	 * count doesn't include top slice add 1, note that sliceVector would be
	 * modified in place by count_dependent_children.
	 */
	top_count = 1 + count_dependent_children(sliceTbl, rootIdx, sliceVector, nTotalSlices);

	qsort(sliceVector, nTotalSlices, sizeof(SliceVec), compare_slice_order);

	return top_count;
}

/*
 * Build a query string to be dispatched to QE.
 */
static char *
buildGpQueryString(DispatchCommandQueryParms *pQueryParms,
				   int *finalLen)
{
	const char *command = pQueryParms->strCommand;
	int			command_len;
	const char *plantree = pQueryParms->serializedPlantree;
	int			plantree_len = pQueryParms->serializedPlantreelen;
	const char *sddesc = pQueryParms->serializedQueryDispatchDesc;
	int			sddesc_len = pQueryParms->serializedQueryDispatchDesclen;
	const char *dtxContextInfo = pQueryParms->serializedDtxContextInfo;
	int			dtxContextInfo_len = pQueryParms->serializedDtxContextInfolen;
	int64		currentStatementStartTimestamp = GetCurrentStatementStartTimestamp();
	Oid			sessionUserId = GetSessionUserId();
	Oid			outerUserId = GetOuterUserId();
	Oid			currentUserId = GetUserId();
	int32		numsegments = getgpsegmentCount();
	StringInfoData resgroupInfo;
	Oid			tempNamespaceId, tempToastNamespaceId;

	int			tmp,
				len;
	uint32		n32;
	int			total_query_len;
	char	   *shared_query,
			   *pos;
	MemoryContext oldContext;

	/*
	 * Must allocate query text within DispatcherContext,
	 */
	Assert(DispatcherContext);
	oldContext = MemoryContextSwitchTo(DispatcherContext);

	/*
	 * If plantree is set then the query string is not so
	 * important, dispatch a truncated version to increase the performance.
	 *
	 * Here we only need to determine the truncated size, the actual work is
	 * done later when copying it to the result buffer.
	 *
	 * The +1 and -1 below are adjustments to accommodate terminating null
	 * character.
	 */
	command_len = strlen(command) + 1;
	if (plantree && command_len > QUERY_STRING_TRUNCATE_SIZE)
		command_len = pg_mbcliplen(command, command_len,
								   QUERY_STRING_TRUNCATE_SIZE-1) + 1;

	initStringInfo(&resgroupInfo);
	if (IsResGroupActivated())
		SerializeResGroupInfo(&resgroupInfo);

	total_query_len = 1 /* 'M' */ +
		sizeof(len) /* message length */ +
		sizeof(gp_command_count) +
		sizeof(sessionUserId) /* sessionUserIsSuper */ +
		sizeof(outerUserId) /* outerUserIsSuper */ +
		sizeof(currentUserId) +
		sizeof(n32) * 2 /* currentStatementStartTimestamp */ +
		sizeof(command_len) +
		sizeof(plantree_len) +
		sizeof(sddesc_len) +
		sizeof(dtxContextInfo_len) +
		dtxContextInfo_len +
		command_len +
		plantree_len +
		sddesc_len +
		sizeof(numsegments) +
		sizeof(resgroupInfo.len) +
		resgroupInfo.len +
		sizeof(tempNamespaceId) +
		sizeof(tempToastNamespaceId) +
		0;

	shared_query = palloc(total_query_len);

	pos = shared_query;

	*pos++ = 'M';

	pos += 4;					/* placeholder for message length */

	tmp = htonl(gp_command_count);
	memcpy(pos, &tmp, sizeof(gp_command_count));
	pos += sizeof(gp_command_count);

	tmp = htonl(sessionUserId);
	memcpy(pos, &tmp, sizeof(sessionUserId));
	pos += sizeof(sessionUserId);

	tmp = htonl(outerUserId);
	memcpy(pos, &tmp, sizeof(outerUserId));
	pos += sizeof(outerUserId);

	tmp = htonl(currentUserId);
	memcpy(pos, &tmp, sizeof(currentUserId));
	pos += sizeof(currentUserId);

	/*
	 * High order half first, since we're doing MSB-first
	 */
	n32 = (uint32) (currentStatementStartTimestamp >> 32);
	n32 = htonl(n32);
	memcpy(pos, &n32, sizeof(n32));
	pos += sizeof(n32);

	/*
	 * Now the low order half
	 */
	n32 = (uint32) currentStatementStartTimestamp;
	n32 = htonl(n32);
	memcpy(pos, &n32, sizeof(n32));
	pos += sizeof(n32);

	tmp = htonl(command_len);
	memcpy(pos, &tmp, sizeof(command_len));
	pos += sizeof(command_len);

	tmp = htonl(plantree_len);
	memcpy(pos, &tmp, sizeof(plantree_len));
	pos += sizeof(plantree_len);

	tmp = htonl(sddesc_len);
	memcpy(pos, &tmp, sizeof(tmp));
	pos += sizeof(tmp);

	tmp = htonl(dtxContextInfo_len);
	memcpy(pos, &tmp, sizeof(tmp));
	pos += sizeof(tmp);

	if (dtxContextInfo_len > 0)
	{
		memcpy(pos, dtxContextInfo, dtxContextInfo_len);
		pos += dtxContextInfo_len;
	}

	memcpy(pos, command, command_len);
	/* If command is truncated we need to set the terminating '\0' manually */
	pos[command_len - 1] = '\0';
	pos += command_len;

	if (plantree_len > 0)
	{
		memcpy(pos, plantree, plantree_len);
		pos += plantree_len;
	}

	if (sddesc_len > 0)
	{
		memcpy(pos, sddesc, sddesc_len);
		pos += sddesc_len;
	}

	tmp = htonl(numsegments);
	memcpy(pos, &tmp, sizeof(numsegments));
	pos += sizeof(numsegments);

	tmp = htonl(resgroupInfo.len);
	memcpy(pos, &tmp, sizeof(resgroupInfo.len));
	pos += sizeof(resgroupInfo.len);

	if (resgroupInfo.len > 0)
	{
		memcpy(pos, resgroupInfo.data, resgroupInfo.len);
		pos += resgroupInfo.len;
	}

	/* pass process local variables to QEs */
	GetTempNamespaceState(&tempNamespaceId, &tempToastNamespaceId);
	tempNamespaceId = htonl(tempNamespaceId);
	tempToastNamespaceId = htonl(tempToastNamespaceId);

	memcpy(pos, &tempNamespaceId, sizeof(tempNamespaceId));
	pos += sizeof(tempNamespaceId);
	memcpy(pos, &tempToastNamespaceId, sizeof(tempToastNamespaceId));
	pos += sizeof(tempToastNamespaceId);

	/*
	 * fill in length placeholder
	 */
	len = pos - shared_query - 1;
	tmp = htonl(len);
	memcpy(shared_query + 1, &tmp, sizeof(len));

	Assert(len + 1 == total_query_len);

	if (finalLen)
		*finalLen = len + 1;

	MemoryContextSwitchTo(oldContext);

	return shared_query;
}

/*
 * This function is used for dispatching sliced plans
 */
static void
cdbdisp_dispatchX(QueryDesc* queryDesc,
					bool planRequiresTxn,
					bool cancelOnError)
{
	SliceVec   *sliceVector = NULL;
	int			nSlices = 1;	/* slices this dispatch cares about */
	int			nTotalSlices = 1;	/* total slices in sliceTbl */

	int			iSlice;
	int			rootIdx;
	char	   *queryText = NULL;
	int			queryTextLength = 0;
	struct SliceTable *sliceTbl;
	struct EState *estate;
	CdbDispatcherState *ds;
	ErrorData *qeError = NULL;
	DispatchCommandQueryParms *pQueryParms;

	if (log_dispatch_stats)
		ResetUsage();

	estate = queryDesc->estate;
	sliceTbl = estate->es_sliceTable;
	Assert(sliceTbl != NULL);

	rootIdx = RootSliceIndex(estate);

	ds = cdbdisp_makeDispatcherState(queryDesc->extended_query);

	/*
	 * Since we intend to execute the plan, inventory the slice tree,
	 * allocate gangs, and associate them with slices.
	 *
	 * On return, gangs have been allocated and CDBProcess lists have
	 * been filled in the slice table.)
	 * 
	 * Notice: This must be done before cdbdisp_buildPlanQueryParms
	 */
	AssignGangs(ds, queryDesc);

	/*
	 * Traverse the slice tree in sliceTbl rooted at rootIdx and build a
	 * vector of slice indexes specifying the order of [potential] dispatch.
	 */
	nTotalSlices = sliceTbl->numSlices;
	sliceVector = palloc0(nTotalSlices * sizeof(SliceVec));
	nSlices = fillSliceVector(sliceTbl, rootIdx, sliceVector, nTotalSlices);
	/* Each slice table has a unique-id. */
	sliceTbl->ic_instance_id = ++gp_interconnect_id;

	pQueryParms = cdbdisp_buildPlanQueryParms(queryDesc, planRequiresTxn);
	queryText = buildGpQueryString(pQueryParms, &queryTextLength);

	/*
	 * Allocate result array with enough slots for QEs of primary gangs.
	 */
	cdbdisp_makeDispatchResults(ds, nTotalSlices, cancelOnError);
	cdbdisp_makeDispatchParams(ds, nTotalSlices, queryText, queryTextLength);

	cdb_total_plans++;
	cdb_total_slices += nSlices;
	if (nSlices > cdb_max_slices)
		cdb_max_slices = nSlices;

	if (DEBUG1 >= log_min_messages)
	{
		char		msec_str[32];

		switch (check_log_duration(msec_str, false))
		{
			case 1:
			case 2:
				ereport(LOG,
						(errmsg("duration to start of dispatch send (root %d): %s ms",
								rootIdx, msec_str)));
				break;
		}
	}

	for (iSlice = 0; iSlice < nSlices; iSlice++)
	{
		Gang	   *primaryGang = NULL;
		ExecSlice  *slice;
		int			si = -1;

		Assert(sliceVector != NULL);

		slice = sliceVector[iSlice].slice;
		si = slice->sliceIndex;

		/*
		 * Is this a slice we should dispatch?
		 */
		if (slice && slice->gangType == GANGTYPE_UNALLOCATED)
		{
			Assert(slice->primaryGang == NULL);

			/*
			 * Most slices are dispatched, however, in many cases the root
			 * runs only on the QD and is not dispatched to the QEs.
			 */
			continue;
		}

		primaryGang = slice->primaryGang;
		Assert(primaryGang != NULL);
		AssertImply(queryDesc->extended_query,
					primaryGang->type == GANGTYPE_PRIMARY_READER ||
					primaryGang->type == GANGTYPE_SINGLETON_READER ||
					primaryGang->type == GANGTYPE_ENTRYDB_READER);

		if (si == slice->rootIndex)
			ds->rootGangSize = primaryGang->size;

		if (Test_print_direct_dispatch_info)
			elog(INFO, "(slice %d) Dispatch command to %s", slice->sliceIndex,
						segmentsToContentStr(slice->segments));

		/*
		 * Bail out if already got an error or cancellation request.
		 */
		if (cancelOnError)
		{
			if (ds->primaryResults->errcode)
				break;
			if (CancelRequested())
				break;
		}
		SIMPLE_FAULT_INJECTOR("before_one_slice_dispatched");

		cdbdisp_dispatchToGang(ds, primaryGang, si);
		if (planRequiresTxn || isDtxExplicitBegin())
			addToGxactDtxSegments(primaryGang);

		SIMPLE_FAULT_INJECTOR("after_one_slice_dispatched");
	}

	pfree(sliceVector);

	cdbdisp_waitDispatchFinish(ds);

	/*
	 * If bailed before completely dispatched, stop QEs and throw error.
	 */
	if (iSlice < nSlices)
	{
		elog(Debug_cancel_print ? LOG : DEBUG2,
			 "Plan dispatch canceled; dispatched %d of %d slices",
			 iSlice, nSlices);

		/*
		 * Cancel any QEs still running, and wait for them to terminate.
		 */
		cdbdisp_cancelDispatch(ds);

		/*
		 * Check and free the results of all gangs. If any QE had an error,
		 * report it and exit via PG_THROW.
		 */
		cdbdisp_getDispatchResults(ds, &qeError);

		if (qeError)
		{
			FlushErrorState();
			ReThrowError(qeError);
		}

		/*
		 * Wasn't an error, must have been an interrupt.
		 */
		CHECK_FOR_INTERRUPTS();

		/*
		 * Strange! Not an interrupt either.
		 */
		ereport(ERROR,
				(errcode(ERRCODE_INTERNAL_ERROR),
				 errmsg_internal("unable to dispatch plan")));
	}

	if (DEBUG1 >= log_min_messages)
	{
		char		msec_str[32];

		switch (check_log_duration(msec_str, false))
		{
			case 1:
			case 2:
				ereport(LOG,
						(errmsg("duration to dispatch out (root %d): %s ms",
								rootIdx, msec_str)));
				break;
		}
	}

	estate->dispatcherState = ds;
}

/*
 * Copy external query parameters from serialized form into a ParamListInfo.
 */
ParamListInfo
deserializeExternParams(SerializedParams *sparams)
{
	TupleRemapper *remapper;
	ParamListInfo paramLI;

	if (sparams->nExternParams == 0)
		return NULL;

	/*
	 * If a transient record type cache was included, load it into
	 * a TupleRemapper.
	 */
	if (sparams->transientTypes)
	{
		remapper = CreateTupleRemapper();
		TRHandleTypeLists(remapper, sparams->transientTypes);
	}
	else
		remapper = NULL;

	/*
	 * Build a new ParamListInfo.
	 */
	paramLI = palloc(offsetof(ParamListInfoData, params) +
					 sparams->nExternParams * sizeof(ParamExternData));
	/* this clears the callback fields, among others */
	memset(paramLI, 0, offsetof(ParamListInfoData, params));
	paramLI->numParams = sparams->nExternParams;

	/*
	 * Read the ParamExternDatas
	 */
	for (int i = 0; i < sparams->nExternParams; i++)
	{
		SerializedParamExternData *sprm = &sparams->externParams[i];
		ParamExternData *prm = &paramLI->params[i];

		prm->ptype = sprm->ptype;
		prm->isnull = sprm->isnull;
		prm->pflags = sprm->pflags;

		/* If remapping record types is needed, do it. */
		if (remapper && prm->ptype != InvalidOid)
			prm->value = TRRemapDatum(remapper, sprm->ptype, sprm->value);
		else
			prm->value = sprm->value;
	}

	return paramLI;
}

/*
 * CdbDispatchCopyStart allocate a writer gang and
 * dispatch the COPY command to segments.
 *
 * In COPY protocol, after a COPY command is dispatched, a response
 * to this will be a PGresult object bearing a status code of
 * PGRES_COPY_OUT or PGRES_COPY_IN, then client can use APIs like
 * PQputCopyData/PQgetCopyData to copy in/out data.
 *
 * cdbdisp_checkDispatchResult() will block until all connections
 * has issued a PGRES_COPY_OUT/PGRES_COPY_IN PGresult response.
 */
void
CdbDispatchCopyStart(struct CdbCopy *cdbCopy, Node *stmt, int flags)
{
	DispatchCommandQueryParms *pQueryParms;
	char *queryText;
	int queryTextLength;
	CdbDispatcherState *ds;
	Gang *primaryGang;
	ErrorData *error = NULL;
	bool needTwoPhase = flags & DF_NEED_TWO_PHASE;

	if (needTwoPhase)
		setupDtxTransaction();

	elogif((Debug_print_full_dtm || log_min_messages <= DEBUG5), LOG,
		   "CdbDispatchCopyStart: %s (needTwoPhase = %s)",
		   (PointerIsValid(debug_query_string) ? debug_query_string : "\"\""),
		   (needTwoPhase ? "true" : "false"));

	pQueryParms = cdbdisp_buildUtilityQueryParms(stmt, flags, NULL);

	/*
	 * Dispatch the command.
	 */
	ds = cdbdisp_makeDispatcherState(false);

	queryText = buildGpQueryString(pQueryParms, &queryTextLength);

	/*
	 * Allocate a primary QE for every available segDB in the system.
	 */
	primaryGang = AllocateGang(ds, GANGTYPE_PRIMARY_WRITER, cdbCopy->seglist);
	Assert(primaryGang);

	cdbdisp_makeDispatchResults(ds, 1, flags & DF_CANCEL_ON_ERROR);
	cdbdisp_makeDispatchParams (ds, 1, queryText, queryTextLength);

	cdbdisp_dispatchToGang(ds, primaryGang, -1);
	if ((flags & DF_NEED_TWO_PHASE) != 0 || isDtxExplicitBegin())
		addToGxactDtxSegments(primaryGang);

	cdbdisp_waitDispatchFinish(ds);

	cdbdisp_checkDispatchResult(ds, DISPATCH_WAIT_NONE);

	if (!cdbdisp_getDispatchResults(ds, &error))
	{
		FlushErrorState();
		ReThrowError(error);
	}

	/*
	 * Notice: Do not call cdbdisp_finishCommand to destroy dispatcher state,
	 * following PQputCopyData/PQgetCopyData will be called on those connections
	 */
	cdbCopy->dispatcherState = ds;
}

void
CdbDispatchCopyEnd(struct CdbCopy *cdbCopy)
{
	CdbDispatcherState *ds;

	ds = cdbCopy->dispatcherState;
	cdbCopy->dispatcherState = NULL;
	cdbdisp_destroyDispatcherState(ds);
}

/*
 * Helper function only used by CdbDispatchSetCommand()
 *
 * Return a List of segment id who has idle segment dbs, the list
 * may contain duplicated segment id. eg, if segment 0 has two
 * idle segment dbs in freelist, the list looks like 0 -> 0.
 */
static List *
formIdleSegmentIdList(void)
{
	CdbComponentDatabases	*cdbs;
	List					*segments = NIL;
	int						i, j;

	cdbs = cdbcomponent_getCdbComponents();

	if (cdbs->segment_db_info != NULL)
	{
		for (i = 0; i < cdbs->total_segment_dbs; i++)
		{
			CdbComponentDatabaseInfo *cdi = &cdbs->segment_db_info[i];
			for (j = 0; j < cdi->numIdleQEs; j++)
				segments = lappend_int(segments, cdi->config->segindex);
		}
	}

	return segments;
}


/*
 * Serialization of query parameters (ParamListInfos and executor params)
 *
 * When a query is dispatched from QD to QE, we also need to dispatch any
 * query parameters (contained in the ParamListInfo struct), and executor
 * parameters that have already been evaluated in the QD. We need to
 * serialize ParamListInfo, but there are a few complications:
 *
 * - ParamListInfo is not a Node type, so we cannot use the usual
 * nodeToStringBinary() function directly. We turn the array of
 * ParamExternDatas into a List of SerializedParamExternData nodes,
 * which we can then pass to nodeToStringBinary().
 *
 * - The paramFetch callback, which could be used in this process to fetch
 * parameter values on-demand, cannot be used in a different process.
 * Therefore, fetch all parameters before serializing them. When
 * deserializing, leave the callbacks NULL.
 *
 * - In order to deserialize correctly, the receiver needs the typlen and
 * typbyval information for each datatype. The receiver has access to the
 * catalogs, so it could look them up, but for the sake of simplicity and
 * robustness in the receiver, we include that information in
 * SerializedParamExternData.
 *
 * - RECORD types. Type information of transient record is kept only in
 * backend private memory, indexed by typmod. The recipient will not know
 * what a record type's typmod means. And record types can also be nested.
 * Because of that, if there are any RECORD, we include a copy of the whole
 * transient record type cache.
 *
 * We form a SerializedParams struct, which contains enough information
 * to reconstruct in the QEs.
 *
 * XXX: Sending *all* record types can be quite bulky, but ATM there is no
 * easy way to extract just the needed record types.
 */
static SerializedParams *
serializeParamsForDispatch(QueryDesc *queryDesc,
						   ParamListInfo externParams,
						   ParamExecData *execParams,
						   List *paramExecTypes,
						   Bitmapset *sendParams)
{
	SerializedParams *result = makeNode(SerializedParams);
	bool		found_records = false;

	/* materialize Extern params */
	if (externParams)
	{
		result->nExternParams = externParams->numParams;
		result->externParams = palloc0(externParams->numParams * sizeof(SerializedParamExternData));

		for (int i = 0; i < externParams->numParams; i++)
		{
			ParamExternData *prm;
			SerializedParamExternData *sprm = &result->externParams[i];
			ParamExternData prmdata;

			/*
			 * First, use paramFetch to fetch any "lazy" parameters. (The callback
			 * function is of no use in the QE.)
			 */
			if (externParams->paramFetch != NULL)
				prm = externParams->paramFetch(externParams, i + 1, false, &prmdata);
			else
				prm = &externParams->params[i];

			sprm->value = prm->value;
			sprm->isnull = prm->isnull;
			sprm->pflags = prm->pflags;
			sprm->ptype = prm->ptype;

			if (OidIsValid(prm->ptype))
			{
				get_typlenbyval(prm->ptype, &sprm->plen, &sprm->pbyval);
				if (prm->ptype == RECORDOID && !prm->isnull)
					found_records = true;
			}
			else
			{
				sprm->plen = 0;
				sprm->pbyval = true;
			}
		}
	}

	/* materialize Exec params */
	if (!bms_is_empty(sendParams))
	{
		int			numExecParams = list_length(paramExecTypes);
		int			x;

		result->nExecParams = numExecParams;
		result->execParams = palloc0(numExecParams * sizeof(SerializedParamExecData));

		x = -1;
		while ((x = bms_next_member(sendParams, x)) >= 0)
		{
			ParamExecData *prm = &execParams[x];
			SerializedParamExecData *sprm = &result->execParams[x];
			Oid			ptype = list_nth_oid(paramExecTypes, x);

			sprm->value = prm->value;
			sprm->isnull = prm->isnull;
			sprm->isvalid = true;

			get_typlenbyval(ptype, &sprm->plen, &sprm->pbyval);
			if (ptype == RECORDOID && !prm->isnull)
				found_records = true;
		}
	}

	/*
	 * If there were any record types, include the transient record type cache.
	 */
	if (found_records)
		result->transientTypes = build_tuple_node_list(0);

	return result;
}

/*
 * Function: getExecParamsToDispatch()
 *
 * Determine which PARAM_EXEC values are valid, and should be included
 * when the query is dispatched to the QEs.
 *
 * When the query eventually runs (on the QD or a QE), it will have access
 * to these PARAM_EXEC values (locally or through serialization).

 * Then, rather than lazily-evaluating the SubPlan to get its value (as for
 * an internal parameter), the plan will just use the value that's already
 * in the EState->es_param_exec_vals array.
 */
static Bitmapset *
getExecParamsToDispatch(PlannedStmt *stmt, ParamExecData *intPrm,
						List **paramExecTypes)
{
	ParamWalkerContext context;
	int			i;
	Plan	   *plan = stmt->planTree;
	int			nIntPrm = list_length(stmt->paramExecTypes);
	Bitmapset  *sendParams = NULL;

	if (nIntPrm == 0)
	{
		*paramExecTypes = NIL;
		return NULL;
	}
	Assert(intPrm != NULL);		/* So there must be some internal parameters. */

	/*
	 * Walk the plan, looking for Param nodes of kind PARAM_EXEC, i.e.,
	 * executor internal parameters.
	 *
	 * We need these for their paramtype field, which isn't available in
	 * either the ParamExecData struct or the SubPlan struct.
	 */

	exec_init_plan_tree_base(&context.base, stmt);
	context.params = NIL;
	param_walker((Node *) plan, &context);

	/*
	 * mpp-25490: subplanX may is within subplan Y, try to param_walker the
	 * subplans list
	 */
	if (list_length(context.params) < nIntPrm)
	{
		ListCell   *sb;

		foreach(sb, stmt->subplans)
		{
			Node	   *subplan = lfirst(sb);

			param_walker((Node *) subplan, &context);
		}
	}

	/*
	 * Now set the bit corresponding to each init plan param. Use the datatype
	 * info harvested above.
	 */
	*paramExecTypes = NIL;
	for (i = 0; i < nIntPrm; i++)
	{
		Oid			paramType = findParamType(context.params, i);

		*paramExecTypes = lappend_oid(*paramExecTypes, paramType);
		if (paramType != InvalidOid)
		{
			sendParams = bms_add_member(sendParams, i);
		}
		else
		{
			/*
			 * Param of id i not found. Likely has been removed by constant
			 * folding.
			 */
		}
	}

	list_free(context.params);

	return sendParams;
}

/*
 * Helper function param_walker() walks a plan and adds any Param nodes
 * to a list in the ParamWalkerContext.
 *
 * This list is input to the function findParamType(), which loops over the
 * list looking for a specific paramid, and returns its type.
 */
static bool
param_walker(Node *node, ParamWalkerContext *context)
{
	if (node == NULL)
		return false;

	if (nodeTag(node) == T_Param)
	{
		Param	   *param = (Param *) node;

		if (param->paramkind == PARAM_EXEC)
		{
			context->params = lappend(context->params, param);
			return false;
		}
	}
	return plan_tree_walker(node, param_walker, context, true);
}

/*
 * Helper function findParamType() iterates over a list of Param nodes,
 * trying to match on the passed-in paramid. Returns the paramtype of a
 * match, else error.
 */
static Oid
findParamType(List *params, int paramid)
{
	ListCell   *l;

	foreach(l, params)
	{
		Param	   *p = lfirst(l);

		if (p->paramid == paramid)
			return p->paramtype;
	}

	return InvalidOid;
}

相关信息

greenplumn 源码目录

相关文章

greenplumn cdbconn 源码

greenplumn cdbdisp 源码

greenplumn cdbdisp_async 源码

greenplumn cdbdisp_dtx 源码

greenplumn cdbdispatchresult 源码

greenplumn cdbgang 源码

greenplumn cdbgang_async 源码

greenplumn cdbpq 源码

0  赞