greenplumn resscheduler 源码
greenplumn resscheduler 代码
文件路径:/src/backend/utils/resscheduler/resscheduler.c
/*-------------------------------------------------------------------------
*
* resscheduler.c
* POSTGRES resource scheduling management code.
*
*
* Portions Copyright (c) 2006-2010, Greenplum inc.
* Portions Copyright (c) 2012-Present VMware, Inc. or its affiliates.
* Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*
* IDENTIFICATION
* src/backend/utils/resscheduler/resscheduler.c
*
*
-------------------------------------------------------------------------
*/
#include "postgres.h"
#include <math.h>
#include "access/genam.h"
#include "access/heapam.h"
#include "access/xact.h"
#include "catalog/indexing.h"
#include "catalog/pg_authid.h"
#include "catalog/pg_resqueue.h"
#include "catalog/pg_resqueuecapability.h"
#include "cdb/cdbllize.h"
#include "cdb/cdbvars.h"
#include "cdb/memquota.h"
#include "executor/executor.h"
#include "miscadmin.h"
#include "nodes/print.h"
#include "optimizer/planner.h"
#include "pgstat.h"
#include "rewrite/rewriteHandler.h"
#include "storage/ipc.h"
#include "storage/proc.h"
#include "storage/lmgr.h"
#include "tcop/tcopprot.h"
#include "tcop/pquery.h"
#include "tcop/utility.h"
#include "utils/guc.h"
#include "utils/fmgroids.h"
#include "utils/memutils.h"
#include "utils/resource_manager.h"
#include "utils/resscheduler.h"
#include "utils/syscache.h"
#include "utils/metrics_utils.h"
/*
* GUC variables.
*/
int gp_resqueue_memory_policy = RESMANAGER_MEMORY_POLICY_NONE;
bool gp_log_resqueue_memory = false;
int gp_resqueue_memory_policy_auto_fixed_mem;
bool gp_resqueue_print_operator_memory_limits = false;
int MaxResourceQueues; /* Max # of queues. */
int MaxResourcePortalsPerXact; /* Max # tracked portals -
* per backend . */
bool ResourceSelectOnly; /* Only lock SELECT/DECLARE? */
bool ResourceCleanupIdleGangs; /* Cleanup idle gangs? */
/*
* Global variables
*/
ResSchedulerData *ResScheduler; /* Resource Scheduler (shared) data .*/
static Oid MyQueueId = InvalidOid; /* resource queue for current role. */
static bool MyQueueIdIsValid = false; /* Is MyQueueId valid? */
static uint32 portalId = 0; /* id of portal, for tracking cursors. */
static int32 numHoldPortals = 0; /* # of holdable cursors tracked. */
/*
* ResSchedulerShmemSize -- estimate size the scheduler structures will need in
* shared memory.
*
*/
Size
ResSchedulerShmemSize(void)
{
Size size;
/* The hash of queues.*/
size = hash_estimate_size(MaxResourceQueues, sizeof(ResQueueData));
/* The scheduler structure. */
size = add_size(size, sizeof(ResSchedulerData));
/* Add a safety margin */
size = add_size(size, size / 10);
return size;
}
/*
* ResPortalIncrementShmemSize -- Estimate the size of the increment hash.
*
* Notes
* We allow one extra slot for unnamed portals, as it is simpler to not
* count them at all.
*/
Size
ResPortalIncrementShmemSize(void)
{
Size size;
long max_table_size = (MaxResourcePortalsPerXact + 1) * MaxBackends;
size = hash_estimate_size(max_table_size, sizeof(ResPortalIncrement));
/* Add a safety margin */
size = add_size(size, size / 10);
return size;
}
/*
* InitResScheduler -- initialize the scheduler queues hash in shared memory.
*
* The queuek hash table has no data in it as yet (InitResQueues cannot be
* called until catalog access is available.
*
*/
void
InitResScheduler(void)
{
bool found;
/* Create the scheduler structure. */
ResScheduler = (ResSchedulerData *)
ShmemInitStruct("Resource Scheduler Data",
sizeof(ResSchedulerData),
&found);
/* Specify that we have no initialized queues yet. */
ResScheduler->num_queues = 0;
if (found)
return; /* We are already initialized. */
/* Create the resource queue hash (empty at this point). */
found = ResQueueHashTableInit();
if (!found)
ereport(FATAL,
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("not enough shared memory for resource scheduler")));
elog(DEBUG1, "initialized resource scheduler");
return;
}
/*
* InitResPortalIncrementHash - Initialize the portal increment hash.
*
* Notes:
* Simply calls the internal initialization function.
*/
void
InitResPortalIncrementHash(void)
{
bool result;
result = ResPortalIncrementHashTableInit();
if (!result)
elog(FATAL, "could not initialize portal increment hash");
elog(DEBUG1, "initialized portal increment hash");
}
/*
* InitResQueues -- initialize the resource queues in shared memory. Note this
* can only be done after enough setup has been done. This uses
* heap_open etc which in turn requires shared memory to be set up.
*/
void
InitResQueues(void)
{
HeapTuple tuple;
int numQueues = 0;
bool queuesok = true;
SysScanDesc sscan;
Assert(ResScheduler);
/**
* The resqueue shared mem initialization must be serialized. Only the first session
* should do the init.
* Serialization is done the ResQueueLock LW_EXCLUSIVE. However, we must obtain all DB
* lock before obtaining LWlock.
* So, we must have obtained ResQueueRelationId and ResQueueCapabilityRelationId lock
* first.
*/
/* XXX XXX: should this be rowexclusive ? */
Relation relResqueue = heap_open(ResQueueRelationId, AccessShareLock);
LockRelationOid(ResQueueCapabilityRelationId, RowExclusiveLock);
LWLockAcquire(ResQueueLock, LW_EXCLUSIVE);
if (ResScheduler->num_queues > 0)
{
/* Hash table has already been loaded */
LWLockRelease(ResQueueLock);
UnlockRelationOid(ResQueueCapabilityRelationId, RowExclusiveLock);
heap_close(relResqueue, AccessShareLock);
return;
}
sscan = systable_beginscan(relResqueue, InvalidOid, false, NULL, 0, NULL);
while (HeapTupleIsValid(tuple = systable_getnext(sscan)))
{
Form_pg_resqueue queueform;
Oid queueid;
bool overcommit;
float4 ignorelimit;
Cost thresholds[NUM_RES_LIMIT_TYPES];
char *queuename;
numQueues++;
queueform = (Form_pg_resqueue) GETSTRUCT(tuple);
queueid = queueform->oid;
queuename = NameStr(queueform->rsqname);
thresholds[RES_COUNT_LIMIT] = queueform->rsqcountlimit;
thresholds[RES_COST_LIMIT] = queueform->rsqcostlimit;
thresholds[RES_MEMORY_LIMIT] = ResourceQueueGetMemoryLimit(queueid);
overcommit = queueform->rsqovercommit;
ignorelimit = queueform->rsqignorecostlimit;
queuesok = ResCreateQueue(queueid, thresholds, overcommit, ignorelimit);
if (!queuesok)
{
/** Break out of loop. Close relations, relinquish LWLock and then error out */
break;
}
}
systable_endscan(sscan);
LWLockRelease(ResQueueLock);
UnlockRelationOid(ResQueueCapabilityRelationId, RowExclusiveLock);
heap_close(relResqueue, AccessShareLock);
if (!queuesok)
ereport(PANIC,
(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
errmsg("insufficient resource queues available"),
errhint("Increase max_resource_queues to %d.", numQueues)));
elog(LOG,"initialized %d resource queues", numQueues);
return;
}
/*
* ResCreateQueue -- initialize the elements for a resource queue.
*
* Notes:
* It is expected that the appropriate lightweight lock is held before
* calling this - unless we are the startup process.
*/
bool
ResCreateQueue(Oid queueid, Cost limits[NUM_RES_LIMIT_TYPES], bool overcommit,
float4 ignorelimit)
{
ResQueue queue;
int i;
Assert(LWLockHeldByMeInMode(ResQueueLock, LW_EXCLUSIVE));
/* If the new queue pointer is NULL, then we are out of queues. */
if (ResScheduler->num_queues >= MaxResourceQueues)
return false;
/**
* Has an entry for this
*/
queue = ResQueueHashNew(queueid);
if (!queue)
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("out of shared memory"),
errhint("You may need to increase max_resource_queues.")));
/* Set queue oid and offset in the scheduler array */
queue->queueid = queueid;
/* Set the number of limits 0 initially. */
queue->num_limits = 0;
/* Set overcommit.*/
queue->overcommit = overcommit;
/* Set ignore cost limit. */
queue->ignorecostlimit = ignorelimit;
/* Now run through all the possible limit types.*/
for (i = 0 ; i < NUM_RES_LIMIT_TYPES; i++)
{
/*
* Specific initializations for the limit types (the current two are
* in fact the same).
*/
switch (i)
{
case RES_COUNT_LIMIT:
case RES_COST_LIMIT:
case RES_MEMORY_LIMIT:
{
queue->num_limits++;
queue->limits[i].type = i;
queue->limits[i].threshold_value = limits[i];
queue->limits[i].current_value = 0;
queue->limits[i].threshold_is_max = true;
break;
}
default:
elog(ERROR, "unknown resource limit type %d", i);
break;
}
}
ResScheduler->num_queues++;
return true;
}
/*
* ResAlterQueue -- Change thresholds, overcommit for a resource queue.
*
* Notes:
* It is expected that the appropriate lightweight lock is held before
* calling this.
*/
ResAlterQueueResult
ResAlterQueue(Oid queueid, Cost limits[NUM_RES_LIMIT_TYPES], bool overcommit,
float4 ignorelimit)
{
ResQueue queue = ResQueueHashFind(queueid);
bool result = ALTERQUEUE_OK;
int i;
/* Handed an Oid for something other than a queue, bail out! */
if (queue == NULL)
return ALTERQUEUE_ERROR;
/*
* Now run through all the possible limit types, checking all the
* intended changes are ok.
*/
for (i = 0 ; i < NUM_RES_LIMIT_TYPES; i++)
{
/* Don't sanity check thresholds if that are about to be disabled. */
if (limits[i] == INVALID_RES_LIMIT_THRESHOLD)
continue;
/*
* MPP-4340:
* The goal of the resource queues is to allow us to throttle work
* onto the cluster: a key requirement is that given a queue we be able to
* change the limit values -- especially in a situation where we're already
* overloading the system, we'd like to be able to drop the queue limit.
*
* The original sanity checks here disallowed modifications which might
* reduce the threshold. To throttle the system for better performance,
* work had to get cancelled. That's an unacceptable limitation; so
* now we allow the queues to be changed.
*
* Overcommit is a much more serious type of change.
*
* For overcommitable limits we need to ensure the queue is not in
* an overcommited state if we about to turn overcommit off.
*/
switch (i)
{
case RES_COUNT_LIMIT:
break;
case RES_COST_LIMIT:
{
/*
* Don't turn overcommit off if queue could be overcommitted.
* We err on the side of caution in this case and demand that
* the queue is idle - we don't want to leave a query waiting
* forever in the wait queue. We use the same roundoff limit
* value (0.1) as ResLockCheckLimit does.
*/
if (!overcommit && queue->overcommit &&
(queue->limits[i].current_value > 0.1))
result = ALTERQUEUE_OVERCOMMITTED;
}
break;
case RES_MEMORY_LIMIT:
break;
}
}
/*
* If threshold and overcommit alterations are all ok, do the changes.
*/
if (result == ALTERQUEUE_OK)
{
for (i = 0 ; i < NUM_RES_LIMIT_TYPES; i++)
{
/*
* Assign the new thresholds.
*/
switch (i)
{
case RES_COUNT_LIMIT:
case RES_COST_LIMIT:
case RES_MEMORY_LIMIT:
{
if (queue->limits[i].threshold_value != limits[i])
queue->limits[i].threshold_value = limits[i];
}
break;
}
}
/* Set overcommit if that has changed. */
if (queue->overcommit != overcommit)
queue->overcommit = overcommit;
/* Set ignore cost limit if that has changed. */
if (queue->ignorecostlimit != ignorelimit)
queue->ignorecostlimit = ignorelimit;
}
return result;
}
/*
* ResDestroyQueue -- destroy a resource queue.
*
* Notes:
* It is expected that the appropriate lightweight lock is held before
* calling this.
*/
bool
ResDestroyQueue(Oid queueid)
{
ResQueue queue = ResQueueHashFind(queueid);
bool allzero = true;
bool result;
int i;
/* Handed an Oid for something other than a queue, bail out! */
if (queue == NULL)
return false;
/*
* Now run through all the possible limit types, checking all the
* current counters are zero.
*/
for (i = 0 ; i < NUM_RES_LIMIT_TYPES; i++)
{
/*
* Specific checks for the limit types (the current two are
* in fact the same).
*/
switch (i)
{
case RES_COUNT_LIMIT:
case RES_COST_LIMIT:
case RES_MEMORY_LIMIT:
{
if (queue->limits[i].current_value != 0)
allzero = false;
}
break;
}
}
/*
* No non-zero current values for the queue, are able to destroy it.
*
* Some care is needed, as this leaves the possibility of a connected
* user still using this queue via a ALTER ROLE ... RESOURCE QUEUE while
* he is connected. However this is no worse than being able to DROP a
* ROLE that is corrently connected!
*/
if (allzero)
{
result = ResQueueHashRemove(queueid);
if (result)
ResScheduler->num_queues--;
return result;
}
else
{
return false;
}
}
/*
* ResLockPortal -- get a resource lock for Portal execution.
*/
void
ResLockPortal(Portal portal, QueryDesc *qDesc)
{
bool shouldReleaseLock = false; /* Release resource lock? */
bool takeLock; /* Take resource lock? */
LOCKTAG tag;
Oid queueid;
int32 lockResult = 0;
ResPortalIncrement incData;
Plan *plan = NULL;
Assert(qDesc);
Assert(qDesc->plannedstmt);
plan = qDesc->plannedstmt->planTree;
portal->status = PORTAL_QUEUE;
queueid = portal->queueId;
/*
* Check we have a valid queue before going any further.
*/
if (queueid != InvalidOid)
{
/*
* Check the source tag to see if the original statement is suitable for
* locking.
*/
switch (portal->sourceTag)
{
/*
* For INSERT/UPDATE/DELETE Skip if we have specified only SELECT,
* otherwise drop through to handle like a SELECT.
*/
case T_InsertStmt:
case T_DeleteStmt:
case T_UpdateStmt:
{
if (ResourceSelectOnly)
{
takeLock = false;
shouldReleaseLock = false;
break;
}
}
/* fallthrough */
case T_SelectStmt:
{
/*
* Setup the resource portal increments, ready to be added.
*/
incData.pid = MyProc->pid;
incData.portalId = portal->portalId;
incData.increments[RES_COUNT_LIMIT] = 1;
incData.increments[RES_COST_LIMIT] = ceil(plan->total_cost);
if (!IsResManagerMemoryPolicyNone())
{
Assert(IsResManagerMemoryPolicyAuto() ||
IsResManagerMemoryPolicyEagerFree());
uint64 queryMemory = qDesc->plannedstmt->query_mem;
Assert(queryMemory > 0);
if (LogResManagerMemory())
{
elog(GP_RESMANAGER_MEMORY_LOG_LEVEL, "query requested %.0fKB", (double) queryMemory / 1024.0);
}
incData.increments[RES_MEMORY_LIMIT] = (Cost) queryMemory;
}
else
{
Assert(IsResManagerMemoryPolicyNone());
incData.increments[RES_MEMORY_LIMIT] = (Cost) 0.0;
}
takeLock = true;
shouldReleaseLock = true;
}
break;
/*
* We are declaring a cursor - do the same as T_SelectStmt, but
* need to additionally consider setting the isHold option.
*/
case T_DeclareCursorStmt:
{
/*
* Setup the resource portal increments, ready to be added.
*/
incData.pid = MyProc->pid;
incData.portalId = portal->portalId;
incData.increments[RES_COUNT_LIMIT] = 1;
incData.increments[RES_COST_LIMIT] = ceil(plan->total_cost);
incData.isHold = portal->cursorOptions & CURSOR_OPT_HOLD;
if (!IsResManagerMemoryPolicyNone())
{
Assert(IsResManagerMemoryPolicyAuto() ||
IsResManagerMemoryPolicyEagerFree());
uint64 queryMemory = qDesc->plannedstmt->query_mem;
Assert(queryMemory > 0);
if (LogResManagerMemory())
{
elog(NOTICE, "query requested %.0fKB", (double) queryMemory / 1024.0);
}
incData.increments[RES_MEMORY_LIMIT] = (Cost) queryMemory;
}
else
{
Assert(IsResManagerMemoryPolicyNone());
incData.increments[RES_MEMORY_LIMIT] = (Cost) 0.0;
}
takeLock = true;
shouldReleaseLock = true;
}
break;
/*
* We do not want to lock any of these query types.
*/
default:
{
takeLock = false;
shouldReleaseLock = false;
}
break;
}
/*
* Get the resource lock.
*/
if (takeLock)
{
#ifdef RESLOCK_DEBUG
elog(DEBUG1, "acquire resource lock for queue %u (portal %u)",
queueid, portal->portalId);
#endif
SET_LOCKTAG_RESOURCE_QUEUE(tag, queueid);
PG_TRY();
{
lockResult = ResLockAcquire(&tag, &incData);
}
PG_CATCH();
{
/*
* We might have been waiting for a resource queue lock when we get
* here. Calling ResLockRelease without calling ResLockWaitCancel will
* cause the locallock to be cleaned up, but will leave the global
* variable lockAwaited still pointing to the locallock hash
* entry.
*/
ResLockWaitCancel();
/* If we had acquired the resource queue lock, release it and clean up */
ResLockRelease(&tag, portal->portalId);
/* GPDB hook for collecting query info */
if (query_info_collect_hook)
(*query_info_collect_hook)(METRICS_QUERY_ERROR, qDesc);
portal->queueId = InvalidOid;
portal->portalId = INVALID_PORTALID;
PG_RE_THROW();
}
PG_END_TRY();
/*
* See if query was too small to bother locking at all, i.e had
* cost smaller than the ignore cost threshold for the queue.
*/
if (lockResult == LOCKACQUIRE_NOT_AVAIL)
{
#ifdef RESLOCK_DEBUG
elog(DEBUG1, "cancel resource lock for queue %u (portal %u)",
queueid, portal->portalId);
#endif
/*
* Reset portalId and queueid for this portal so the queue
* and increment accounting tests continue to work properly.
*/
portal->queueId = InvalidOid;
portal->portalId = INVALID_PORTALID;
shouldReleaseLock = false;
}
/* Count holdable cursors (if we are locking this one) .*/
if (portal->cursorOptions & CURSOR_OPT_HOLD && shouldReleaseLock)
numHoldPortals++;
}
}
portal->hasResQueueLock = shouldReleaseLock;
}
/* This function is a simple version of ResLockPortal, which is used specially
* for utility statements; the main logic is same as ResLockPortal, but remove
* some unnecessary lines and make some tiny adjustments for utility stmts */
void
ResLockUtilityPortal(Portal portal, float4 ignoreCostLimit)
{
bool shouldReleaseLock = false;
LOCKTAG tag;
Oid queueid;
int32 lockResult = 0;
ResPortalIncrement incData;
queueid = portal->queueId;
/*
* Check we have a valid queue before going any further.
*/
if (queueid != InvalidOid)
{
/*
* Setup the resource portal increments, ready to be added.
*/
incData.pid = MyProc->pid;
incData.portalId = portal->portalId;
incData.increments[RES_COUNT_LIMIT] = 1;
incData.increments[RES_COST_LIMIT] = ignoreCostLimit;
incData.increments[RES_MEMORY_LIMIT] = (Cost) 0.0;
shouldReleaseLock = true;
/*
* Get the resource lock.
*/
#ifdef RESLOCK_DEBUG
elog(DEBUG1, "acquire resource lock for queue %u (portal %u)",
queueid, portal->portalId);
#endif
SET_LOCKTAG_RESOURCE_QUEUE(tag, queueid);
PG_TRY();
{
lockResult = ResLockAcquire(&tag, &incData);
}
PG_CATCH();
{
/*
* We might have been waiting for a resource queue lock when we get
* here. Calling ResLockRelease without calling ResLockWaitCancel will
* cause the locallock to be cleaned up, but will leave the global
* variable lockAwaited still pointing to the locallock hash
* entry.
*/
ResLockWaitCancel();
/* If we had acquired the resource queue lock, release it and clean up */
ResLockRelease(&tag, portal->portalId);
/*
* Clean up if we got cancelled while waiting.
*/
portal->queueId = InvalidOid;
portal->portalId = INVALID_PORTALID;
PG_RE_THROW();
}
PG_END_TRY();
}
portal->hasResQueueLock = shouldReleaseLock;
}
/*
* ResUnLockPortal -- release a resource lock for a portal.
*/
void
ResUnLockPortal(Portal portal)
{
LOCKTAG tag;
Oid queueid;
queueid = portal->queueId;
/*
* Check we have a valid queue before going any further.
*/
if (queueid != InvalidOid)
{
#ifdef RESLOCK_DEBUG
elog(DEBUG1, "release resource lock for queue %u (portal %u)",
queueid, portal->portalId);
#endif
SET_LOCKTAG_RESOURCE_QUEUE(tag, queueid);
ResLockRelease(&tag, portal->portalId);
/* Count holdable cursors.*/
if (portal->cursorOptions & CURSOR_OPT_HOLD)
{
Assert(numHoldPortals > 0);
numHoldPortals--;
}
}
portal->hasResQueueLock = false;
return;
}
/*
* GetResQueueForRole -- determine what resource queue a role is going to use.
*
* Notes
* This could be called for each of ResLockPortal and ResUnLockPortal, but we
* can eliminate a relation open and lock if it is cached.
*/
Oid
GetResQueueForRole(Oid roleid)
{
HeapTuple tuple;
bool isnull;
Oid queueid;
tuple = SearchSysCache1(AUTHOID, ObjectIdGetDatum(roleid));
if (!tuple)
return DEFAULTRESQUEUE_OID; /* role not found */
queueid = SysCacheGetAttr(AUTHOID, tuple, Anum_pg_authid_rolresqueue, &isnull);
/* MPP-6926: use default queue if none specified */
if (!OidIsValid(queueid) || isnull)
queueid = DEFAULTRESQUEUE_OID;
ReleaseSysCache(tuple);
return queueid;
}
/*
* SetResQueueId -- set the cached value for the current resource queue.
*
* Notes
* Needs to be called at session initialization and after (or in) SET ROLE.
*/
void
SetResQueueId(void)
{
MyQueueId = InvalidOid;
MyQueueIdIsValid = false;
}
/*
* GetResQueueId -- return the current cached value for the resource queue.
*/
Oid
GetResQueueId(void)
{
if (!MyQueueIdIsValid)
{
/*
* GPDB_94_MERGE_FIXME: cannot do catalog lookups, if we're not in a
* transaction. Just play dumb, then. Arguably, abort processing
* shouldn't be governed by resource queues, anyway.
*/
if (!IsTransactionState())
return InvalidOid;
MyQueueId = GetResQueueForRole(GetUserId());
MyQueueIdIsValid = true;
}
return MyQueueId;
}
/*
* ResQueueIdForName -- Return the Oid for a resource queue name
*
* Notes:
* Used by the various admin commands to convert a user supplied queue name
* to Oid.
*/
Oid
GetResQueueIdForName(char *name)
{
Relation rel;
ScanKeyData scankey;
SysScanDesc scan;
HeapTuple tuple;
Oid queueid;
rel = heap_open(ResQueueRelationId, AccessShareLock);
/* SELECT oid FROM pg_resqueue WHERE rsqname = :1 */
ScanKeyInit(&scankey,
Anum_pg_resqueue_rsqname,
BTEqualStrategyNumber, F_NAMEEQ,
CStringGetDatum(name));
scan = systable_beginscan(rel, ResQueueRsqnameIndexId, true,
NULL, 1, &scankey);
tuple = systable_getnext(scan);
if (tuple)
queueid = ((Form_pg_resqueue) GETSTRUCT(tuple))->oid;
else
queueid = InvalidOid;
systable_endscan(scan);
heap_close(rel, AccessShareLock);
return queueid;
}
/*
* ResCreatePortalId -- return an id for a portal
*
* Notes
* We return a new id for any named portal, but zero for an un-named one.
* This is because we use the id to track cursors, and there can only be one
* un-named portal active.
*/
uint32
ResCreatePortalId(const char *name)
{
uint32 id;
ResPortalTag portalTag;
bool found = false;
/* Unnamed portal. */
if (name[0] == '\0')
{
id = 0;
return id;
}
LWLockAcquire(ResQueueLock, LW_EXCLUSIVE);
/*
* Named portal.
* Search for a id we can re-use from a closed portal - we want to do this
* so we can have MaxResourcePortalsPerXact *open* tracked (named) portals
* in a transaction.
*/
for (id = 1; id <= portalId; id++)
{
MemSet(&portalTag, 0, sizeof(ResPortalTag));
portalTag.pid = MyProc->pid;
portalTag.portalId = id;
if (!ResIncrementFind(&portalTag))
{
found = true;
break;
}
}
LWLockRelease(ResQueueLock);
/*
* No re-usable portal ids, check we have not exhaused the # of trackable
* portals and return the next id.
*/
if (!found)
{
if (portalId >= MaxResourcePortalsPerXact)
{
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
errmsg("insufficient portal ids available"),
errhint("Increase max_resource_portals_per_transaction.")));
}
id = (++portalId);
}
return id;
}
/*
* AtCommit_ResScheduler -- do any pre-commit processing for Resource
* Scheduling.
*/
void
AtCommit_ResScheduler(void)
{
/* reset the portal id. */
if (numHoldPortals == 0)
portalId = 0;
}
/*
* AtAbort_ResScheduler -- do any abort processing for Resource
* Scheduling.
*/
void
AtAbort_ResScheduler(void)
{
/* reset the portal id. */
if (numHoldPortals == 0)
portalId = 0;
}
/* This routine checks whether specified utility stmt should be involved into
* resource queue mgmt; if yes, take the slot from the resource queue; if we
* want to track additional utility stmts, add it into the condition check */
void
ResHandleUtilityStmt(Portal portal, Node *stmt)
{
if (!IsA(stmt, CopyStmt) && !IsA(stmt, CreateTableAsStmt))
{
return;
}
if (Gp_role == GP_ROLE_DISPATCH
&& IsResQueueEnabled()
&& (!ResourceSelectOnly)
&& !superuser())
{
Assert(!LWLockHeldByMeInMode(ResQueueLock, LW_EXCLUSIVE));
LWLockAcquire(ResQueueLock, LW_EXCLUSIVE);
ResQueue resQueue = ResQueueHashFind(portal->queueId);
LWLockRelease(ResQueueLock);
Assert(resQueue);
int numSlots = (int) ceil(resQueue->limits[RES_COUNT_LIMIT].threshold_value);
if (numSlots >= 1) /* statement limit exists */
{
portal->status = PORTAL_QUEUE;
ResLockUtilityPortal(portal, resQueue->ignorecostlimit);
}
portal->status = PORTAL_ACTIVE;
}
}
相关信息
相关文章
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦