greenplumn cdbdispatchresult 源码
greenplumn cdbdispatchresult 代码
文件路径:/src/backend/cdb/dispatcher/cdbdispatchresult.c
/*-------------------------------------------------------------------------
*
* cdbdispatchresult.c
* Functions for handling dispatch results
*
*
* Portions Copyright (c) 2005-2008, Greenplum inc
* Portions Copyright (c) 2012-Present VMware, Inc. or its affiliates.
*
*
* IDENTIFICATION
* src/backend/cdb/dispatcher/cdbdispatchresult.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "libpq-fe.h" /* prerequisite for libpq-int.h */
#include "libpq-int.h" /* PQExpBufferData */
#include "utils/guc.h" /* log_min_messages */
#include "cdb/cdbconn.h" /* SegmentDatabaseDescriptor */
#include "cdb/cdbvars.h"
#include "cdb/cdbsreh.h"
#include "cdb/cdbdispatchresult.h"
static int cdbdisp_snatchPGresults(CdbDispatchResult *dispatchResult,
struct pg_result **pgresultptrs, int maxresults);
static void
noTrailingNewlinePQ(PQExpBuffer buf)
{
while (buf->len > 0 && buf->data[buf->len - 1] <= ' ' && buf->data[buf->len - 1] > '\0')
buf->data[--buf->len] = '\0';
}
static void
oneTrailingNewlinePQ(PQExpBuffer buf)
{
noTrailingNewlinePQ(buf);
if (buf->len > 0)
appendPQExpBufferChar(buf, '\n');
}
/*
* Create a CdbDispatchResult object, appending it to the
* resultArray of a given CdbDispatchResults object.
*/
CdbDispatchResult *
cdbdisp_makeResult(struct CdbDispatchResults *meleeResults,
struct SegmentDatabaseDescriptor *segdbDesc, int sliceIndex)
{
CdbDispatchResult *dispatchResult;
int meleeIndex;
Assert(meleeResults &&
meleeResults->resultArray &&
meleeResults->resultCount < meleeResults->resultCapacity);
/*
* Allocate a slot for the new CdbDispatchResult object.
*/
meleeIndex = meleeResults->resultCount++;
dispatchResult = &meleeResults->resultArray[meleeIndex];
/*
* Initialize CdbDispatchResult.
*/
dispatchResult->meleeResults = meleeResults;
dispatchResult->meleeIndex = meleeIndex;
dispatchResult->segdbDesc = segdbDesc;
dispatchResult->resultbuf = createPQExpBuffer();
dispatchResult->error_message = createPQExpBuffer();
dispatchResult->numrowsrejected = 0;
dispatchResult->numrowscompleted = 0;
dispatchResult->ackPGNotifies = NULL;
#ifdef FAULT_INJECTOR
if (SIMPLE_FAULT_INJECTOR("make_dispatch_result_error") == FaultInjectorTypeSkip)
{
/*
* Inject a fault to simulate the createPQExpBuffer return NULL (maybe because of
* malloc failure, this will lead to enter the below if block and return NULL in
* this function. We need to test this code path to verify the gang clean up code
* is correct.
*/
dispatchResult->resultbuf = NULL;
}
#endif
if (PQExpBufferBroken(dispatchResult->resultbuf) ||
PQExpBufferBroken(dispatchResult->error_message))
{
destroyPQExpBuffer(dispatchResult->resultbuf);
dispatchResult->resultbuf = NULL;
destroyPQExpBuffer(dispatchResult->error_message);
dispatchResult->error_message = NULL;
/*
* caller is responsible for cleanup -- can't elog(ERROR, ...) from
* here.
*/
return NULL;
}
/*
* Reset summary indicators.
*/
cdbdisp_resetResult(dispatchResult);
/*
* Update slice map entry.
*/
if (sliceIndex >= 0 && sliceIndex < meleeResults->sliceCapacity)
{
CdbDispatchResults_SliceInfo *si = &meleeResults->sliceMap[sliceIndex];
if (si->resultBegin == si->resultEnd)
{
si->resultBegin = meleeIndex;
si->resultEnd = meleeIndex + 1;
}
else
{
if (si->resultBegin > meleeIndex)
si->resultBegin = meleeIndex;
if (si->resultEnd <= meleeIndex)
si->resultEnd = meleeIndex + 1;
}
}
return dispatchResult;
}
/*
* Destroy a CdbDispatchResult object.
*/
void
cdbdisp_termResult(CdbDispatchResult *dispatchResult)
{
PQExpBuffer trash;
dispatchResult->segdbDesc = NULL;
/*
* Free the PGresult objects.
*/
cdbdisp_resetResult(dispatchResult);
/*
* Free the error message buffer and result buffer.
*/
trash = dispatchResult->resultbuf;
dispatchResult->resultbuf = NULL;
destroyPQExpBuffer(trash);
trash = dispatchResult->error_message;
dispatchResult->error_message = NULL;
destroyPQExpBuffer(trash);
}
/*
* Reset a CdbDispatchResult object for possible reuse.
*/
void
cdbdisp_resetResult(CdbDispatchResult *dispatchResult)
{
PQExpBuffer buf = dispatchResult->resultbuf;
/*
* the resultbuf may be empty due to oom, set in cdbdisp_makeResult()
* Related to issue: https://github.com/greenplum-db/gpdb/issues/12399
*/
if (buf)
{
PGresult **begp = (PGresult **) buf->data;
PGresult **endp = (PGresult **) (buf->data + buf->len);
PGresult **p;
/*
* Free the PGresult objects.
*/
for (p = begp; p < endp; ++p)
{
Assert(*p != NULL);
PQclear(*p);
}
}
PGnotify* pgnotify = (PGnotify *) dispatchResult->ackPGNotifies;
while (pgnotify)
{
PGnotify* temp = pgnotify;
pgnotify = temp->next;
PQfreemem(temp);
}
dispatchResult->ackPGNotifies = NULL;
/*
* Reset summary indicators.
*/
dispatchResult->errcode = 0;
dispatchResult->okindex = -1;
/*
* Reset progress indicators.
*/
dispatchResult->hasDispatched = false;
dispatchResult->stillRunning = false;
dispatchResult->receivedAckMsg = false;
dispatchResult->sentSignal = DISPATCH_WAIT_NONE;
dispatchResult->wasCanceled = false;
/*
* Empty (but don't free) the error message buffer and result buffer.
*/
resetPQExpBuffer(dispatchResult->resultbuf);
resetPQExpBuffer(dispatchResult->error_message);
}
/*
* Take note of an error.
* 'errcode' is the ERRCODE_xxx value for setting the client's SQLSTATE.
*/
void
cdbdisp_seterrcode(int errcode, /* ERRCODE_xxx or 0 */
int resultIndex, /* -1 if no PGresult */
CdbDispatchResult *dispatchResult)
{
CdbDispatchResults *meleeResults = dispatchResult->meleeResults;
/*
* We must ensure a nonzero errcode.
*/
if (!errcode)
errcode = ERRCODE_INTERNAL_ERROR;
/*
* Was the command canceled?
*/
if (errcode == ERRCODE_GP_OPERATION_CANCELED ||
errcode == ERRCODE_QUERY_CANCELED)
dispatchResult->wasCanceled = true;
/*
* If this is the first error from this QE, save the error code and the
* index of the PGresult buffer entry. We assume the caller has not yet
* added the item to the PGresult buffer.
*/
if (!dispatchResult->errcode)
{
dispatchResult->errcode = errcode;
}
if (!meleeResults)
return;
/*
* Remember which QE reported an error first among the gangs, but keep
* quiet about cancellation done at our request.
*
* Interconnection errors are given lower precedence because often they
* are secondary to an earlier and more interesting error.
*/
if (errcode == ERRCODE_GP_OPERATION_CANCELED &&
dispatchResult->sentSignal == DISPATCH_WAIT_CANCEL)
{
/* nop */
}
else if (meleeResults->errcode == 0 ||
(meleeResults->errcode == ERRCODE_GP_INTERCONNECTION_ERROR &&
errcode != ERRCODE_GP_INTERCONNECTION_ERROR))
{
meleeResults->errcode = errcode;
meleeResults->iFirstError = dispatchResult->meleeIndex;
}
}
/*
* NonThread version of cdbdisp_appendMessage.
*
* It's safe to use palloc/pfree or elog/ereport.
*/
void
cdbdisp_appendMessageNonThread(CdbDispatchResult *dispatchResult,
int elevel, const char *fmt,...)
{
va_list args;
int msgoff;
/*
* Remember first error.
*/
cdbdisp_seterrcode(ERRCODE_GP_INTERCONNECTION_ERROR, -1, dispatchResult);
/*
* Allocate buffer if first message. Insert newline between previous
* message and new one.
*/
Assert(dispatchResult->error_message != NULL);
oneTrailingNewlinePQ(dispatchResult->error_message);
msgoff = dispatchResult->error_message->len;
/*
* Format the message and append it to the buffer.
*/
va_start(args, fmt);
appendPQExpBufferVA(dispatchResult->error_message, fmt, args);
va_end(args);
/*
* Display the message on stderr for debugging, if requested. This helps
* to clarify the actual timing of threaded events.
*/
if (elevel >= log_min_messages)
{
oneTrailingNewlinePQ(dispatchResult->error_message);
elog(LOG, "%s", dispatchResult->error_message->data + msgoff);
}
/*
* In case the caller wants to hand the buffer to ereport(), follow the
* ereport() convention of not ending with a newline.
*/
noTrailingNewlinePQ(dispatchResult->error_message);
}
/*
* Store a PGresult object ptr in the result buffer.
* NB: Caller must not PQclear() the PGresult object.
*/
void
cdbdisp_appendResult(CdbDispatchResult *dispatchResult, struct pg_result *res)
{
Assert(dispatchResult && res);
/*
* Attach the QE identification string to the PGresult
*/
if (dispatchResult->segdbDesc && dispatchResult->segdbDesc->whoami)
pqSaveMessageField(res, PG_DIAG_GP_PROCESS_TAG, dispatchResult->segdbDesc->whoami);
appendBinaryPQExpBuffer(dispatchResult->resultbuf, (char *) &res, sizeof(res));
}
/*
* Return the i'th PGresult object ptr (if i >= 0), or
* the n+i'th one (if i < 0), or NULL (if i out of bounds).
* NB: Caller must not PQclear() the PGresult object.
*/
struct pg_result *
cdbdisp_getPGresult(CdbDispatchResult *dispatchResult, int i)
{
if (dispatchResult)
{
PQExpBuffer buf = dispatchResult->resultbuf;
PGresult **begp = (PGresult **) buf->data;
PGresult **endp = (PGresult **) (buf->data + buf->len);
PGresult **p = (i >= 0) ? &begp[i] : &endp[i];
if (p >= begp && p < endp)
{
Assert(*p != NULL);
return *p;
}
}
return NULL;
}
/*
* Return the number of PGresult objects in the result buffer.
*/
int
cdbdisp_numPGresult(CdbDispatchResult *dispatchResult)
{
return dispatchResult ? dispatchResult->resultbuf->len / sizeof(PGresult *) : 0;
}
/*
* Display a CdbDispatchResult in the log for debugging.
* Call only from main thread, during or after cdbdisp_checkDispatchResults.
*/
void
cdbdisp_debugDispatchResult(CdbDispatchResult *dispatchResult)
{
int ires;
int nres;
Assert(dispatchResult != NULL);
/*
* PGresult messages
*/
nres = cdbdisp_numPGresult(dispatchResult);
for (ires = 0; ires < nres; ++ires)
{
PGresult *pgresult = cdbdisp_getPGresult(dispatchResult, ires);
ExecStatusType resultStatus = PQresultStatus(pgresult);
char *whoami = PQresultErrorField(pgresult, PG_DIAG_GP_PROCESS_TAG);
if (!whoami)
whoami = "no process id";
if (resultStatus == PGRES_COMMAND_OK ||
resultStatus == PGRES_TUPLES_OK ||
resultStatus == PGRES_COPY_IN ||
resultStatus == PGRES_COPY_OUT ||
resultStatus == PGRES_EMPTY_QUERY)
{
char *cmdStatus = PQcmdStatus(pgresult);
elog(LOG, "DispatchResult from %s: ok %s (%s)",
dispatchResult->segdbDesc->whoami,
cmdStatus ? cmdStatus : "(no cmdStatus)", whoami);
}
else
{
char *sqlstate = PQresultErrorField(pgresult, PG_DIAG_SQLSTATE);
char *pri = PQresultErrorField(pgresult, PG_DIAG_MESSAGE_PRIMARY);
char *dtl = PQresultErrorField(pgresult, PG_DIAG_MESSAGE_DETAIL);
char *sourceFile = PQresultErrorField(pgresult, PG_DIAG_SOURCE_FILE);
char *sourceLine = PQresultErrorField(pgresult, PG_DIAG_SOURCE_LINE);
int lenpri = (pri == NULL) ? 0 : strlen(pri);
if (!sqlstate)
sqlstate = "no SQLSTATE";
while (lenpri > 0 && pri[lenpri - 1] <= ' ' && pri[lenpri - 1] > '\0')
lenpri--;
ereport(LOG,
(errmsg("DispatchResult from %s: error (%s) %s %.*s (%s)",
dispatchResult->segdbDesc->whoami,
sqlstate,
PQresStatus(resultStatus),
lenpri,
pri ? pri : "", whoami),
errdetail("(%s:%s) %s",
sourceFile ? sourceFile : "unknown file",
sourceLine ? sourceLine : "unknown line",
dtl ? dtl : "")));
}
}
/*
* Error found on our side of the libpq interface?
*/
if (dispatchResult->error_message &&
dispatchResult->error_message->len > 0)
{
char esqlstate[6];
errcode_to_sqlstate(dispatchResult->errcode, esqlstate);
elog(LOG, "DispatchResult from %s: connect error (%s) %s",
dispatchResult->segdbDesc->whoami,
esqlstate, dispatchResult->error_message->data);
}
/*
* Should have either an error code or an ok result.
*/
if (dispatchResult->errcode == 0 && dispatchResult->okindex < 0)
{
elog(LOG, "DispatchResult from %s: No ending status.",
dispatchResult->segdbDesc->whoami);
}
}
/*
* Construct an ErrorData from the dispatch results.
*/
ErrorData *
cdbdisp_dumpDispatchResult(CdbDispatchResult *dispatchResult)
{
int ires;
int nres;
ErrorData *errdata;
if (!dispatchResult)
return NULL;
/*
* Format PGresult messages
*/
nres = cdbdisp_numPGresult(dispatchResult);
for (ires = 0; ires < nres; ++ires)
{
PGresult *pgresult = cdbdisp_getPGresult(dispatchResult, ires);
errdata = cdbdisp_get_PQerror(pgresult);
if (errdata)
return errdata;
}
/*
* Error found on our side of the libpq interface?
*/
if (dispatchResult->error_message &&
dispatchResult->error_message->len > 0)
{
if (errstart(ERROR, TEXTDOMAIN))
{
errcode(ERRCODE_GP_INTERCONNECTION_ERROR);
errmsg("%s", dispatchResult->error_message->data);
errdata = errfinish_and_return(__FILE__, __LINE__, PG_FUNCNAME_MACRO);
}
else
pg_unreachable();
return errdata;
}
return NULL;
}
/*
* The returned error object is allocated in TopTransactionContext.
*
* Caution: do not use the returned object across transaction boundary.
* Current usages of this API are such that the returned object is either
* logged using elog() or rethrown, both within a transaction context, at the
* time of finishing a dispatched command. The caution applies to future uses
* of this function.
*/
ErrorData *
cdbdisp_get_PQerror(PGresult *pgresult)
{
MemoryContext oldcontext;
ExecStatusType resultStatus = PQresultStatus(pgresult);
/*
* QE success
*/
if (resultStatus == PGRES_COMMAND_OK ||
resultStatus == PGRES_TUPLES_OK ||
resultStatus == PGRES_COPY_IN ||
resultStatus == PGRES_COPY_OUT ||
resultStatus == PGRES_EMPTY_QUERY)
{
return NULL;
}
/*
* QE error or libpq error
*/
/* These will be overwritten below with the values from QE, if the QE sent them. */
char *filename = __FILE__;
int lineno = __LINE__;
const char *funcname = PG_FUNCNAME_MACRO;
int qe_errcode = ERRCODE_GP_INTERCONNECTION_ERROR;
char *whoami;
char *fld;
/*
* errstart need a const filename and funcname, make sure they
* are at least const in this transaction.
*/
oldcontext = MemoryContextSwitchTo(TopTransactionContext);
fld = PQresultErrorField(pgresult, PG_DIAG_SOURCE_FILE);
if (fld)
filename = pstrdup(fld);
fld = PQresultErrorField(pgresult, PG_DIAG_SOURCE_LINE);
if (fld)
lineno = atoi(fld);
fld = PQresultErrorField(pgresult, PG_DIAG_SOURCE_FUNCTION);
if (fld)
funcname = pstrdup(fld);
MemoryContextSwitchTo(oldcontext);
/*
* We should only get errors with ERROR level or above, if the
* command failed. And if a QE disconnected with FATAL, or PANICed,
* we don't want to do the same in the QD. So, always an ERROR.
*/
if (!errstart(ERROR, TEXTDOMAIN))
pg_unreachable(); /* unexpected path. */
fld = PQresultErrorField(pgresult, PG_DIAG_SQLSTATE);
if (fld)
qe_errcode = sqlstate_to_errcode(fld);
errcode(qe_errcode);
whoami = PQresultErrorField(pgresult, PG_DIAG_GP_PROCESS_TAG);
fld = PQresultErrorField(pgresult, PG_DIAG_MESSAGE_PRIMARY);
if (!fld)
fld = "no primary message received";
if (whoami)
errmsg("%s (%s)", fld, whoami);
else
errmsg("%s", fld);
fld = PQresultErrorField(pgresult, PG_DIAG_MESSAGE_DETAIL);
if (fld)
errdetail("%s", fld);
fld = PQresultErrorField(pgresult, PG_DIAG_MESSAGE_HINT);
if (fld)
errhint("%s", fld);
fld = PQresultErrorField(pgresult, PG_DIAG_CONTEXT);
if (fld)
errcontext("%s", fld);
fld = PQresultErrorField(pgresult, PG_DIAG_SCHEMA_NAME);
if (fld)
err_generic_string(PG_DIAG_SCHEMA_NAME, fld);
fld = PQresultErrorField(pgresult, PG_DIAG_TABLE_NAME);
if (fld)
err_generic_string(PG_DIAG_TABLE_NAME, fld);
fld = PQresultErrorField(pgresult, PG_DIAG_COLUMN_NAME);
if (fld)
err_generic_string(PG_DIAG_COLUMN_NAME, fld);
fld = PQresultErrorField(pgresult, PG_DIAG_DATATYPE_NAME);
if (fld)
err_generic_string(PG_DIAG_DATATYPE_NAME, fld);
fld = PQresultErrorField(pgresult, PG_DIAG_CONSTRAINT_NAME);
if (fld)
err_generic_string(PG_DIAG_CONSTRAINT_NAME, fld);
Assert(TopTransactionContext);
oldcontext = MemoryContextSwitchTo(TopTransactionContext);
ErrorData *edata = errfinish_and_return(filename, lineno, funcname);
MemoryContextSwitchTo(oldcontext);
return edata;
}
/*
* Format a CdbDispatchResults object.
* Returns an ErrorData object in *qeError if some error was found, or NIL if no errors.
* Before calling this function, you must call CdbCheckDispatchResult().
*/
void
cdbdisp_dumpDispatchResults(struct CdbDispatchResults *meleeResults,
ErrorData **qeError)
{
CdbDispatchResult *dispatchResult;
/*
* Quick exit if no error (not counting ERRCODE_GP_OPERATION_CANCELED).
*/
if (!meleeResults || !meleeResults->errcode)
{
*qeError = NULL;
return;
}
/*
* Find the CdbDispatchResult of the first QE that got an error.
*/
Assert(meleeResults->iFirstError >= 0 &&
meleeResults->iFirstError < meleeResults->resultCount);
dispatchResult = &meleeResults->resultArray[meleeResults->iFirstError];
Assert(dispatchResult->meleeResults == meleeResults &&
dispatchResult->errcode != 0);
/*
* Format one QE's result.
*/
*qeError = cdbdisp_dumpDispatchResult(dispatchResult);
}
/*
* Return sum of the cmdTuples values from CdbDispatchResult
* entries that have a successful PGresult. If sliceIndex >= 0,
* uses only the results belonging to the specified slice.
*/
int64
cdbdisp_sumCmdTuples(CdbDispatchResults *results, int sliceIndex)
{
CdbDispatchResult *dispatchResult;
CdbDispatchResult *resultEnd = cdbdisp_resultEnd(results, sliceIndex);
PGresult *pgresult;
int64 sum = 0;
for (dispatchResult = cdbdisp_resultBegin(results, sliceIndex);
dispatchResult < resultEnd; ++dispatchResult)
{
pgresult = cdbdisp_getPGresult(dispatchResult, dispatchResult->okindex);
if (pgresult && !dispatchResult->errcode)
{
char *cmdTuples = PQcmdTuples(pgresult);
if (cmdTuples)
sum += atoll(cmdTuples);
}
}
return sum;
}
/*
* If several tuples were eliminated/rejected from the result because of
* bad data formatting (this is currenly only possible in external tables
* with single row error handling) - sum up the total rows rejected from
* all QE's and notify the client.
*/
void
cdbdisp_sumRejectedRows(CdbDispatchResults *results)
{
CdbDispatchResult *dispatchResult;
CdbDispatchResult *resultEnd = cdbdisp_resultEnd(results, -1);
PGresult *pgresult;
uint64 totalRejected = 0;
for (dispatchResult = cdbdisp_resultBegin(results, -1);
dispatchResult < resultEnd; ++dispatchResult)
{
pgresult = cdbdisp_getPGresult(dispatchResult, dispatchResult->okindex);
if (pgresult && !dispatchResult->errcode)
{
/*
* add num rows rejected from this QE to the total
*/
totalRejected += dispatchResult->numrowsrejected;
}
}
if (totalRejected > 0)
ReportSrehResults(NULL, totalRejected);
}
/*
* Return ptr to first resultArray entry for a given sliceIndex.
*/
CdbDispatchResult *
cdbdisp_resultBegin(CdbDispatchResults *results, int sliceIndex)
{
CdbDispatchResults_SliceInfo *si;
if (!results)
return NULL;
if (sliceIndex < 0)
return &results->resultArray[0];
Assert(sliceIndex < results->sliceCapacity);
si = &results->sliceMap[sliceIndex];
Assert(si->resultBegin >= 0 &&
si->resultBegin <= si->resultEnd &&
si->resultEnd <= results->resultCount);
return &results->resultArray[si->resultBegin];
}
/*
* Return ptr to last+1 resultArray entry for a given sliceIndex.
*/
CdbDispatchResult *
cdbdisp_resultEnd(CdbDispatchResults *results, int sliceIndex)
{
CdbDispatchResults_SliceInfo *si;
if (!results)
return NULL;
if (sliceIndex < 0)
return &results->resultArray[results->resultCount];
si = &results->sliceMap[sliceIndex];
return &results->resultArray[si->resultEnd];
}
void
cdbdisp_returnResults(CdbDispatchResults *primaryResults, CdbPgResults *cdb_pgresults)
{
CdbDispatchResult *dispatchResult;
int nslots;
int nresults = 0;
int i;
if (!primaryResults || !cdb_pgresults)
return;
/*
* Allocate result set ptr array. The caller must PQclear() each PGresult
* and free() the array.
*/
nslots = 0;
for (i = 0; i < primaryResults->resultCount; ++i)
nslots += cdbdisp_numPGresult(&primaryResults->resultArray[i]);
cdb_pgresults->pg_results = (struct pg_result **) palloc0(nslots * sizeof(struct pg_result *));
/*
* Collect results from primary gang.
*/
for (i = 0; i < primaryResults->resultCount; ++i)
{
dispatchResult = &primaryResults->resultArray[i];
/*
* Take ownership of this QE's PGresult object(s).
*/
nresults += cdbdisp_snatchPGresults(dispatchResult,
cdb_pgresults->pg_results + nresults,
nslots - nresults);
}
Assert(nresults == nslots);
/* tell the caller how many sets we're returning. */
cdb_pgresults->numResults = nresults;
}
/*
* used in the interconnect on the dispatcher to avoid error-cleanup deadlocks.
*/
bool
cdbdisp_checkResultsErrcode(struct CdbDispatchResults *meleeResults)
{
if (meleeResults == NULL)
return false;
if (meleeResults->errcode)
return true;
return false;
}
/*
* cdbdisp_makeDispatchResults:
* Allocates a CdbDispatchResults object in the current memory context.
* Will be freed in function cdbdisp_destroyDispatcherState by deleting the
* memory context.
*/
void
cdbdisp_makeDispatchResults(CdbDispatcherState *ds,
int sliceCapacity,
bool cancelOnError)
{
CdbDispatchResults *results;
MemoryContext oldContext;
int resultCapacity;
int nbytes;
Assert(DispatcherContext);
oldContext = MemoryContextSwitchTo(DispatcherContext);
resultCapacity = ds->largestGangSize * sliceCapacity;
nbytes = resultCapacity * sizeof(results->resultArray[0]);
results = palloc0(sizeof(*results));
results->resultArray = palloc0(nbytes);
results->resultCapacity = resultCapacity;
results->resultCount = 0;
results->iFirstError = -1;
results->errcode = 0;
results->cancelOnError = cancelOnError;
results->sliceMap = NULL;
results->sliceCapacity = sliceCapacity;
if (sliceCapacity > 0)
{
nbytes = sliceCapacity * sizeof(results->sliceMap[0]);
results->sliceMap = palloc0(nbytes);
}
MemoryContextSwitchTo(oldContext);
ds->primaryResults = results;
}
void
cdbdisp_clearCdbPgResults(CdbPgResults *cdb_pgresults)
{
int i = 0;
if (!cdb_pgresults)
return;
for (i = 0; i < cdb_pgresults->numResults; i++)
PQclear(cdb_pgresults->pg_results[i]);
if (cdb_pgresults->pg_results)
{
pfree(cdb_pgresults->pg_results);
cdb_pgresults->pg_results = NULL;
}
cdb_pgresults->numResults = 0;
}
/*
* Remove all of the PGresult ptrs from a CdbDispatchResult object
* and place them into an array provided by the caller. The caller
* becomes responsible for PQclear()ing them. Returns the number of
* PGresult ptrs placed in the array.
*/
static int
cdbdisp_snatchPGresults(CdbDispatchResult *dispatchResult,
struct pg_result **pgresultptrs, int maxresults)
{
PQExpBuffer buf = dispatchResult->resultbuf;
PGresult **begp = (PGresult **) buf->data;
PGresult **endp = (PGresult **) (buf->data + buf->len);
PGresult **p;
int nresults = 0;
/*
* Snatch the PGresult objects.
*/
for (p = begp; p < endp; ++p)
{
Assert(*p != NULL);
Assert(nresults < maxresults);
pgresultptrs[nresults++] = *p;
*p = NULL;
}
/*
* Empty our PGresult array.
*/
resetPQExpBuffer(buf);
dispatchResult->okindex = -1;
return nresults;
}
相关信息
相关文章
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦