greenplumn resqueue 源码
greenplumn resqueue 代码
文件路径:/src/backend/utils/resscheduler/resqueue.c
/*-------------------------------------------------------------------------
*
* resqueue.c
* POSTGRES internals code for resource queues and locks.
*
*
* Portions Copyright (c) 2006-2008, Greenplum inc.
* Portions Copyright (c) 2012-Present VMware, Inc. or its affiliates.
* Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*
* IDENTIFICATION
* src/backend/utils/resscheduler/resqueue.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include <math.h>
#include <time.h>
#include "pgstat.h"
#include "access/heapam.h"
#include "access/twophase.h"
#include "access/twophase_rmgr.h"
#include "access/xact.h"
#include "catalog/pg_resourcetype.h"
#include "catalog/pg_resqueue.h"
#include "catalog/pg_type.h"
#include "cdb/cdbgang.h"
#include "cdb/cdbvars.h"
#include "common/hashfn.h"
#include "funcapi.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "storage/lock.h"
#include "storage/lmgr.h"
#include "utils/builtins.h"
#include "utils/guc_tables.h"
#include "utils/memutils.h"
#include "utils/portal.h"
#include "utils/ps_status.h"
#include "utils/resowner.h"
#include "utils/resource_manager.h"
#include "utils/resscheduler.h"
#include "cdb/memquota.h"
#include "commands/queue.h"
#include "storage/proc.h"
static void ResCleanUpLock(LOCK *lock, PROCLOCK *proclock, uint32 hashcode, bool wakeupNeeded);
static ResPortalIncrement *ResIncrementAdd(ResPortalIncrement *incSet,
PROCLOCK *proclock,
ResourceOwner owner,
ResIncrementAddStatus *status);
static bool ResIncrementRemove(ResPortalTag *portaltag);
static void ResWaitOnLock(LOCALLOCK *locallock, ResourceOwner owner, ResPortalIncrement *incrementSet);
static void ResLockUpdateLimit(LOCK *lock, PROCLOCK *proclock, ResPortalIncrement *incrementSet, bool increment, bool inError);
static void ResGrantLock(LOCK *lock, PROCLOCK *proclock);
static bool ResUnGrantLock(LOCK *lock, PROCLOCK *proclock);
static uint64 ResourceQueueGetSuperuserQueryMemoryLimit(void);
/*
* Global Variables
*/
static HTAB *ResPortalIncrementHash; /* Hash of resource increments. */
static HTAB *ResQueueHash; /* Hash of resource queues. */
/*
* Record structure holding the to be exposed per queue data, used by
* pg_resqueue_status().
*/
typedef struct
{
Oid queueid;
float4 queuecountthreshold;
float4 queuecostthreshold;
float4 queuememthreshold;
float4 queuecountvalue;
float4 queuecostvalue;
float4 queuememvalue;
int queuewaiters;
int queueholders;
} QueueStatusRec;
/*
* Function context for data persisting over repeated calls, used by
* pg_resqueue_status().
*/
typedef struct
{
QueueStatusRec *record;
int numRecords;
} QueueStatusContext;
static void BuildQueueStatusContext(QueueStatusContext *fctx);
/*
* ResLockAcquire -- acquire a resource lock.
*
* Notes and critisms:
*
* Returns LOCKACQUIRE_OK if we get the lock,
* LOCKACQUIRE_NOT_AVAIL if we don't want to take the lock after all.
*
* Analogous to LockAcquire, but the lockmode and session boolean are not
* required in the function prototype as we are *always* lockmode ExclusiveLock
* and have no session locks.
*
* The semantics of resource locks mean that lockmode has minimal meaning -
* the conflict rules are determined by the state of the counters of the
* corresponding queue. We are maintaining the lock lockmode and related
* elements (holdmask etc), in order to ease comparison with standard locks
* at deadlock check time (well, so we hope anyway.)
*
* The "locktag" here consists of the queue-id and the "lockmethod" of
* "resource-queue" and an identifier specifying that this is a
* resource-locktag.
*
*/
LockAcquireResult
ResLockAcquire(LOCKTAG *locktag, ResPortalIncrement *incrementSet)
{
LOCKMODE lockmode = ExclusiveLock;
LOCK *lock;
PROCLOCK *proclock;
PROCLOCKTAG proclocktag;
LOCALLOCKTAG localtag;
LOCALLOCK *locallock;
uint32 hashcode;
uint32 proclock_hashcode;
int partition;
LWLockId partitionLock;
bool found;
ResourceOwner owner;
ResQueue queue;
int status;
ResIncrementAddStatus addStatus;
/* Setup the lock method bits. */
Assert(locktag->locktag_lockmethodid == RESOURCE_LOCKMETHOD);
/* Provide a resource owner. */
owner = CurrentResourceOwner;
/*
* Find or create a LOCALLOCK entry for this lock and lockmode
*/
MemSet(&localtag, 0, sizeof(localtag)); /* must clear padding */
localtag.lock = *locktag;
localtag.mode = lockmode;
locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash,
(void *) &localtag,
HASH_ENTER, &found);
/*
* if it's a new locallock object, initialize it, if it already exists
* then that is enough for the resource locks.
*/
if (!found)
{
locallock->lock = NULL;
locallock->proclock = NULL;
locallock->hashcode = LockTagHashCode(&(localtag.lock));
locallock->istemptable = false;
locallock->nLocks = 0;
locallock->numLockOwners = 0;
locallock->maxLockOwners = 8;
locallock->holdsStrongLockCount = false;
locallock->lockCleared = false;
locallock->lockOwners = NULL;
locallock->lockOwners = (LOCALLOCKOWNER *)
MemoryContextAlloc(TopMemoryContext, locallock->maxLockOwners * sizeof(LOCALLOCKOWNER));
}
/* We are going to examine the shared lock table. */
hashcode = locallock->hashcode;
partition = LockHashPartition(hashcode);
partitionLock = LockHashPartitionLock(hashcode);
LWLockAcquire(partitionLock, LW_EXCLUSIVE);
/*
* Find or create a lock with this tag.
*/
lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash,
(void *) locktag,
hashcode,
HASH_ENTER_NULL,
&found);
locallock->lock = lock;
if (!lock)
{
LWLockRelease(partitionLock);
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("out of shared memory"),
errhint("You may need to increase max_resource_queues.")));
}
/*
* if it's a new lock object, initialize it.
*/
if (!found)
{
lock->grantMask = 0;
lock->waitMask = 0;
SHMQueueInit(&(lock->procLocks));
ProcQueueInit(&(lock->waitProcs));
lock->nRequested = 0;
lock->nGranted = 0;
MemSet(lock->requested, 0, sizeof(int) * MAX_LOCKMODES);
MemSet(lock->granted, 0, sizeof(int) * MAX_LOCKMODES);
}
else
{
Assert((lock->nRequested >= 0) && (lock->requested[lockmode] >= 0));
Assert((lock->nGranted >= 0) && (lock->granted[lockmode] >= 0));
Assert(lock->nGranted <= lock->nRequested);
}
/*
* Create the hash key for the proclock table.
*/
MemSet(&proclocktag, 0, sizeof(PROCLOCKTAG)); /* Clear padding. */
proclocktag.myLock = lock;
proclocktag.myProc = MyProc;
proclock_hashcode = ProcLockHashCode(&proclocktag, hashcode);
/*
* Find or create a proclock entry with this tag.
*/
proclock = (PROCLOCK *) hash_search_with_hash_value(LockMethodProcLockHash,
(void *) &proclocktag,
proclock_hashcode,
HASH_ENTER_NULL,
&found);
locallock->proclock = proclock;
if (!proclock)
{
/* Not enough shmem for the proclock. */
if (lock->nRequested == 0)
{
/*
* There are no other requestors of this lock, so garbage-collect
* the lock object. We *must* do this to avoid a permanent leak
* of shared memory, because there won't be anything to cause
* anyone to release the lock object later.
*/
Assert(SHMQueueEmpty(&(lock->procLocks)));
if (!hash_search_with_hash_value(LockMethodLockHash,
(void *) &(lock->tag),
hashcode,
HASH_REMOVE,
NULL))
elog(PANIC, "lock table corrupted");
}
LWLockRelease(partitionLock);
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("out of shared memory"),
errhint("You may need to increase max_resource_queues.")));
}
/*
* If new, initialize the new entry.
*/
if (!found)
{
/*
* Resource queues don't participate in "group locking", used to share
* locks between leader process and parallel worker processes in
* PostgreSQL. But we better still set 'groupLeader', it is assumed
* to be valid on all PROCLOCKs, and is accessed e.g. by
* GetLockStatusData().
*/
proclock->groupLeader = MyProc->lockGroupLeader != NULL ?
MyProc->lockGroupLeader : MyProc;
proclock->holdMask = 0;
proclock->releaseMask = 0;
/* Add proclock to appropriate lists */
SHMQueueInsertBefore(&lock->procLocks, &proclock->lockLink);
SHMQueueInsertBefore(&(MyProc->myProcLocks[partition]), &proclock->procLink);
proclock->nLocks = 0;
SHMQueueInit(&(proclock->portalLinks));
}
else
{
Assert((proclock->holdMask & ~lock->grantMask) == 0);
/* Could do a deadlock risk check here. */
}
/*
* lock->nRequested and lock->requested[] count the total number of
* requests, whether granted or waiting, so increment those immediately.
* The other counts don't increment till we get the lock.
*/
lock->nRequested++;
lock->requested[lockmode]++;
Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0));
LWLockAcquire(ResQueueLock, LW_EXCLUSIVE);
/* Look up existing queue */
PG_TRY();
{
queue = GetResQueueFromLock(lock);
}
PG_CATCH();
{
/*
* Something wrong happened - our RQ is gone. Release all locks and
* clean out
*/
lock->nRequested--;
lock->requested[lockmode]--;
LWLockReleaseAll();
PG_RE_THROW();
}
PG_END_TRY();
/*
* If the query cost is smaller than the ignore cost limit for this queue
* then don't try to take a lock at all.
*/
if (incrementSet->increments[RES_COST_LIMIT] < queue->ignorecostlimit)
{
/* Decrement requested. */
lock->nRequested--;
lock->requested[lockmode]--;
Assert((lock->nRequested >= 0) && (lock->requested[lockmode] >= 0));
/*
* Clean up the locallock. Since a single locallock can represent
* multiple locked portals in the same backend, we can only remove it if
* this is the last portal.
*/
if (proclock->nLocks == 0)
RemoveLocalLock(locallock);
ResCleanUpLock(lock, proclock, hashcode, false);
LWLockRelease(ResQueueLock);
LWLockRelease(partitionLock);
/*
* To avoid queue accounting problems, we will need to reset the
* queueId and portalId for this portal *after* returning from here.
*/
return LOCKACQUIRE_NOT_AVAIL;
}
/*
* Otherwise, we are going to take a lock, Add an increment to the
* increment hash for this process.
*/
incrementSet = ResIncrementAdd(incrementSet, proclock, owner, &addStatus);
if (addStatus != RES_INCREMENT_ADD_OK)
{
/*
* We have failed to add the increment. So decrement the requested
* counters, relinquish locks and raise the appropriate error.
*/
lock->nRequested--;
lock->requested[lockmode]--;
LWLockRelease(ResQueueLock);
LWLockRelease(partitionLock);
if (addStatus == RES_INCREMENT_ADD_OOSM)
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("out of shared memory adding portal increments"),
errhint("You may need to increase max_resource_portals_per_transaction.")));
else
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("duplicate portal id %u for proc %d",
incrementSet->portalId, incrementSet->pid)));
}
/*
* Check if the lock can be acquired (i.e. if the resource the lock and
* queue control is not exhausted).
*/
status = ResLockCheckLimit(lock, proclock, incrementSet, true);
if (status == STATUS_ERROR)
{
/*
* The requested lock has individual increments that are larger than
* some of the thresholds for the corrosponding queue, and overcommit
* is not enabled for them. So abort and clean up.
*/
ResPortalTag portalTag;
/* Adjust the counters as we no longer want this lock. */
lock->nRequested--;
lock->requested[lockmode]--;
Assert((lock->nRequested >= 0) && (lock->requested[lockmode] >= 0));
/*
* Clean up the locallock. Since a single locallock can represent
* multiple locked portals in the same backend, we can only remove it if
* this is the last portal.
*/
if (proclock->nLocks == 0)
RemoveLocalLock(locallock);
ResCleanUpLock(lock, proclock, hashcode, false);
/* Kill off the increment. */
MemSet(&portalTag, 0, sizeof(ResPortalTag));
portalTag.pid = incrementSet->pid;
portalTag.portalId = incrementSet->portalId;
ResIncrementRemove(&portalTag);
LWLockRelease(ResQueueLock);
LWLockRelease(partitionLock);
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
errmsg("statement requires more resources than resource queue allows")));
}
else if (status == STATUS_OK)
{
/*
* The requested lock will *not* exhaust the limit for this resource
* queue, so record this in the local lock hash, and grant it.
*/
ResGrantLock(lock, proclock);
ResLockUpdateLimit(lock, proclock, incrementSet, true, false);
LWLockRelease(ResQueueLock);
/* Note the start time for queue statistics. */
pgstat_record_start_queue_exec(incrementSet->portalId,
locktag->locktag_field1);
}
else
{
Assert(status == STATUS_FOUND);
/*
* The requested lock will exhaust the limit for this resource queue,
* so must wait.
*/
/* Set bitmask of locks this process already holds on this object. */
MyProc->heldLocks = proclock->holdMask; /* Do we need to do this? */
/*
* Set the portal id so we can identify what increments we are wanting
* to apply at wakeup.
*/
MyProc->waitPortalId = incrementSet->portalId;
LWLockRelease(ResQueueLock);
/* Note count and wait time for queue statistics. */
pgstat_count_queue_wait(incrementSet->portalId,
locktag->locktag_field1);
pgstat_record_start_queue_wait(incrementSet->portalId,
locktag->locktag_field1);
/*
* Sleep till someone wakes me up.
*/
ResWaitOnLock(locallock, owner, incrementSet);
/*
* Have been awakened, check state is consistent.
*/
if (!(proclock->holdMask & LOCKBIT_ON(lockmode)))
{
LWLockRelease(partitionLock);
elog(ERROR, "ResLockAcquire failed");
}
/* Reset the portal id. */
MyProc->waitPortalId = INVALID_PORTALID;
/* End wait time and start execute time statistics for this queue. */
pgstat_record_end_queue_wait(incrementSet->portalId,
locktag->locktag_field1);
pgstat_record_start_queue_exec(incrementSet->portalId,
locktag->locktag_field1);
}
/* Release the partition lock. */
LWLockRelease(partitionLock);
return LOCKACQUIRE_OK;
}
/*
* ResLockRelease -- release a resource lock.
*
* The "locktag" here consists of the queue-id and the "lockmethod" of
* "resource-queue" and an identifier specifying that this is a
* resource-locktag.
*/
bool
ResLockRelease(LOCKTAG *locktag, uint32 resPortalId)
{
LOCKMODE lockmode = ExclusiveLock;
LOCK *lock;
PROCLOCK *proclock;
LOCALLOCKTAG localtag;
LOCALLOCK *locallock;
uint32 hashcode;
LWLockId partitionLock;
ResourceOwner owner;
ResPortalIncrement *incrementSet;
ResPortalTag portalTag;
/* Check the lock method bits. */
Assert(locktag->locktag_lockmethodid == RESOURCE_LOCKMETHOD);
/* Provide a resource owner. */
owner = CurrentResourceOwner;
/*
* Find the LOCALLOCK entry for this lock and lockmode
*/
MemSet(&localtag, 0, sizeof(localtag)); /* must clear padding */
localtag.lock = *locktag;
localtag.mode = lockmode;
locallock = (LOCALLOCK *)
hash_search(LockMethodLocalHash, (void *) &localtag, HASH_FIND, NULL);
/*
* If the lock request did not get very far, cleanup is easy.
*/
if (!locallock ||
!locallock->lock ||
!locallock->proclock)
{
elog(LOG, "Resource queue %d: no lock to release", locktag->locktag_field1);
if (locallock)
{
RemoveLocalLock(locallock);
}
return false;
}
hashcode = locallock->hashcode;
/* We are going to examine the shared lock table. */
partitionLock = LockHashPartitionLock(hashcode);
LWLockAcquire(partitionLock, LW_EXCLUSIVE);
/*
* Verify that our LOCALLOCK still matches the shared tables.
*
* While waiting for the lock, our request could have been canceled to
* resolve a deadlock. It could already have been removed from the shared
* LOCK and PROCLOCK tables, and those entries could have been reallocated
* for some other request. Then all we need to do is clean up the
* LOCALLOCK entry.
*/
lock = locallock->lock;
proclock = locallock->proclock;
if (proclock->tag.myLock != lock ||
proclock->tag.myProc != MyProc ||
memcmp(&locallock->tag.lock, &lock->tag, sizeof(lock->tag)) != 0)
{
LWLockRelease(partitionLock);
elog(DEBUG1, "Resource queue %d: lock already gone", locktag->locktag_field1);
RemoveLocalLock(locallock);
return false;
}
LWLockAcquire(ResQueueLock, LW_EXCLUSIVE);
/*
* Double-check that we are actually holding a lock of the type we want to
* Release.
*/
if (!(proclock->holdMask & LOCKBIT_ON(lockmode)) || proclock->nLocks <= 0)
{
elog(DEBUG1, "Resource queue %d: proclock not held", locktag->locktag_field1);
RemoveLocalLock(locallock);
ResCleanUpLock(lock, proclock, hashcode, false);
LWLockRelease(ResQueueLock);
LWLockRelease(partitionLock);
return false;
}
/*
* Find the increment for this portal and process.
*/
MemSet(&portalTag, 0, sizeof(ResPortalTag));
portalTag.pid = MyProc->pid;
portalTag.portalId = resPortalId;
incrementSet = ResIncrementFind(&portalTag);
if (!incrementSet)
{
elog(DEBUG1, "Resource queue %d: increment not found on unlock", locktag->locktag_field1);
/*
* Clean up the locallock. Since a single locallock can represent
* multiple locked portals in the same backend, we can only remove it if
* this is the last portal.
*/
if (proclock->nLocks == 0)
{
RemoveLocalLock(locallock);
}
ResCleanUpLock(lock, proclock, hashcode, true);
LWLockRelease(ResQueueLock);
LWLockRelease(partitionLock);
return false;
}
/*
* Un-grant the lock.
*/
ResUnGrantLock(lock, proclock);
ResLockUpdateLimit(lock, proclock, incrementSet, false, false);
/*
* Perform clean-up, waking up any waiters!
*
* Clean up the locallock. Since a single locallock can represent
* multiple locked portals in the same backend, we can only remove it if
* this is the last portal.
*/
if (proclock->nLocks == 0)
RemoveLocalLock(locallock);
ResCleanUpLock(lock, proclock, hashcode, true);
/*
* Clean up the increment set.
*/
if (!ResIncrementRemove(&portalTag))
{
LWLockRelease(ResQueueLock);
LWLockRelease(partitionLock);
elog(ERROR, "no increment to remove for portal id %u and pid %d", resPortalId, MyProc->pid);
/* not reached */
}
LWLockRelease(ResQueueLock);
LWLockRelease(partitionLock);
/* Update execute statistics for this queue, count and elapsed time. */
pgstat_count_queue_exec(resPortalId, locktag->locktag_field1);
pgstat_record_end_queue_exec(resPortalId, locktag->locktag_field1);
return true;
}
bool
IsResQueueLockedForPortal(Portal portal) {
return portal->hasResQueueLock;
}
/*
* ResLockCheckLimit -- test whether the given process acquiring the this lock
* will cause a resource to exceed its limits.
*
* Notes:
* Returns STATUS_FOUND if limit will be exhausted, STATUS_OK if not.
*
* If increment is true, then the resource counter associated with the lock
* is to be incremented, if false then decremented.
*
* Named similarly to the LockCheckconflicts() for standard locks, but it is
* not checking a table of lock mode conflicts, but whether a shared counter
* for some resource is exhausted.
*
* The resource queue lightweight lock (ResQueueLock) must be held while
* this function is called.
*
* MPP-4340: modified logic so that we return STATUS_OK when
* decrementing resource -- decrements shouldn't care, let's not stop
* them from freeing resources!
*/
int
ResLockCheckLimit(LOCK *lock, PROCLOCK *proclock, ResPortalIncrement *incrementSet, bool increment)
{
ResQueue queue;
ResLimit limits;
bool over_limit = false;
bool will_overcommit = false;
int status = STATUS_OK;
Cost increment_amt;
int i;
Assert(LWLockHeldByMeInMode(ResQueueLock, LW_EXCLUSIVE));
/* Get the queue for this lock. */
queue = GetResQueueFromLock(lock);
limits = queue->limits;
for (i = 0; i < NUM_RES_LIMIT_TYPES; i++)
{
/*
* Skip the default threshold, as it means 'no limit'.
*/
if (limits[i].threshold_value == INVALID_RES_LIMIT_THRESHOLD)
continue;
switch (limits[i].type)
{
case RES_COUNT_LIMIT:
{
Assert((limits[i].threshold_is_max));
/* Setup whether to increment or decrement the # active. */
if (increment)
{
increment_amt = incrementSet->increments[i];
if (limits[i].current_value + increment_amt > limits[i].threshold_value)
over_limit = true;
}
else
{
increment_amt = -1 * incrementSet->increments[i];
}
#ifdef RESLOCK_DEBUG
elog(DEBUG1, "checking count limit threshold %.0f current %.0f",
limits[i].threshold_value, limits[i].current_value);
#endif
}
break;
case RES_COST_LIMIT:
{
Assert((limits[i].threshold_is_max));
/* Setup whether to increment or decrement the cost. */
if (increment)
{
increment_amt = incrementSet->increments[i];
/* Check if this will overcommit */
if (increment_amt > limits[i].threshold_value)
will_overcommit = true;
if (queue->overcommit)
{
/*
* Autocommit is enabled, allow statements that
* blowout the limit if noone else is active!
*/
if ((limits[i].current_value + increment_amt > limits[i].threshold_value) &&
(limits[i].current_value > 0.1))
over_limit = true;
}
else
{
/*
* No autocommit, so always fail statements that
* blowout the limit.
*/
if (limits[i].current_value + increment_amt > limits[i].threshold_value)
over_limit = true;
}
}
else
{
increment_amt = -1 * incrementSet->increments[i];
}
#ifdef RESLOCK_DEBUG
elog(DEBUG1, "checking cost limit threshold %.2f current %.2f",
limits[i].threshold_value, limits[i].current_value);
#endif
}
break;
case RES_MEMORY_LIMIT:
{
Assert((limits[i].threshold_is_max));
/* Setup whether to increment or decrement the # active. */
if (increment)
{
increment_amt = incrementSet->increments[i];
if (limits[i].current_value + increment_amt > limits[i].threshold_value)
over_limit = true;
}
else
{
increment_amt = -1 * incrementSet->increments[i];
}
#ifdef RESLOCK_DEBUG
elog(DEBUG1, "checking memory limit threshold %.0f current %.0f",
limits[i].threshold_value, limits[i].current_value);
#endif
}
break;
default:
break;
}
}
if (will_overcommit && !queue->overcommit)
status = STATUS_ERROR;
else if (over_limit)
status = STATUS_FOUND;
return status;
}
/*
* ResLockUpdateLimit -- update the resource counter for this lock with the
* increment for the process.
*
* Notes:
* If increment is true, then the resource counter associated with the lock
* is to be incremented, if false then decremented.
*
* Warnings:
* The resource queue lightweight lock (ResQueueLock) must be held while
* this function is called.
*/
void
ResLockUpdateLimit(LOCK *lock, PROCLOCK *proclock, ResPortalIncrement *incrementSet, bool increment, bool inError)
{
ResQueue queue;
ResLimit limits;
Cost increment_amt;
int i;
Assert(LWLockHeldByMeInMode(ResQueueLock, LW_EXCLUSIVE));
/* Get the queue for this lock. */
queue = GetResQueueFromLock(lock);
limits = queue->limits;
for (i = 0; i < NUM_RES_LIMIT_TYPES; i++)
{
/*
* MPP-8454: NOTE that if our resource-queue has been modified since
* we locked our resources, on unlock it is possible that we're
* deducting an increment that we never added -- the lowest value we
* should allow is 0.0.
*
*/
switch (limits[i].type)
{
case RES_COUNT_LIMIT:
case RES_COST_LIMIT:
case RES_MEMORY_LIMIT:
{
Cost new_value;
Assert((limits[i].threshold_is_max));
/* setup whether to increment or decrement the # active. */
if (increment)
{
increment_amt = incrementSet->increments[i];
}
else
{
increment_amt = -1 * incrementSet->increments[i];
}
new_value = ceil(limits[i].current_value + increment_amt);
new_value = Max(new_value, 0.0);
limits[i].current_value = new_value;
}
break;
default:
break;
}
}
return;
}
/*
* GetResQueueFromLock -- find the resource queue for a given lock;
*
* Notes:
* should be handed a locktag containing a valid queue id.
* should hold the resource queue lightweight lock during this operation
*/
ResQueue
GetResQueueFromLock(LOCK *lock)
{
Assert(LWLockHeldByMeInMode(ResQueueLock, LW_EXCLUSIVE));
ResQueue queue = ResQueueHashFind(GET_RESOURCE_QUEUEID_FOR_LOCK(lock));
if (queue == NULL)
{
elog(ERROR, "cannot find queue id %d", GET_RESOURCE_QUEUEID_FOR_LOCK(lock));
}
return queue;
}
/*
* ResGrantLock -- grant a resource lock.
*
* Warnings:
* It is expected that the partition lock is held before calling this
* function, as the various shared queue counts are inspected.
*/
static void
ResGrantLock(LOCK *lock, PROCLOCK *proclock)
{
LOCKMODE lockmode = ExclusiveLock;
/* Update the standard lock stuff, for locks and proclocks. */
lock->nGranted++;
lock->granted[lockmode]++;
lock->grantMask |= LOCKBIT_ON(lockmode);
if (lock->granted[lockmode] == lock->requested[lockmode])
{
lock->waitMask &= LOCKBIT_OFF(lockmode); /* no more waiters. */
}
proclock->holdMask |= LOCKBIT_ON(lockmode);
Assert((lock->nGranted > 0) && (lock->granted[lockmode] > 0));
Assert(lock->nGranted <= lock->nRequested);
/* Update the holders count. */
proclock->nLocks++;
return;
}
/*
* ResUnGrantLock -- opposite of ResGrantLock.
*
* Notes:
* The equivalant standard lock function returns true only if there are waiters,
* we don't do this.
*
* Warnings:
* It is expected that the partition lock held before calling this
* function, as the various shared queue counts are inspected.
*/
bool
ResUnGrantLock(LOCK *lock, PROCLOCK *proclock)
{
LOCKMODE lockmode = ExclusiveLock;
Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0));
Assert((lock->nGranted > 0) && (lock->granted[lockmode] > 0));
Assert(lock->nGranted <= lock->nRequested);
/* Update the standard lock stuff. */
lock->nRequested--;
lock->requested[lockmode]--;
lock->nGranted--;
lock->granted[lockmode]--;
if (lock->granted[lockmode] == 0)
{
/* change the conflict mask. No more of this lock type. */
lock->grantMask &= LOCKBIT_OFF(lockmode);
}
/* Update the holders count. */
proclock->nLocks--;
/* Fix the per-proclock state. */
if (proclock->nLocks == 0)
{
proclock->holdMask &= LOCKBIT_OFF(lockmode);
}
return true;
}
/*
* ResCleanUpLock -- lock cleanup, remove entry from lock queues and start
* waking up waiters.
*
* MPP-6055/MPP-6144: we get called more than once; if we've already cleaned
* up, don't walk off the end of lists; or panic when we can't find our hashtable
* entries.
*/
static void
ResCleanUpLock(LOCK *lock, PROCLOCK *proclock, uint32 hashcode, bool wakeupNeeded)
{
Assert(LWLockHeldByMeInMode(ResQueueLock, LW_EXCLUSIVE));
/*
* This check should really be an assertion. But to guard against edge cases
* previously not encountered, PANIC instead.
*/
if (lock->tag.locktag_type != LOCKTAG_RESOURCE_QUEUE ||
proclock->tag.myLock->tag.locktag_type != LOCKTAG_RESOURCE_QUEUE)
{
ereport(PANIC,
errmsg("We are trying to clean up a non-resource queue lock"),
errdetail("lock's locktag type = %d and proclock's locktag type = %d",
lock->tag.locktag_type,
proclock->tag.myLock->tag.locktag_type));
}
/*
* If this was my last hold on this lock, delete my entry in the proclock
* table.
*/
if (proclock->holdMask == 0 && proclock->nLocks == 0)
{
uint32 proclock_hashcode;
if (proclock->lockLink.next != NULL)
SHMQueueDelete(&proclock->lockLink);
if (proclock->procLink.next != NULL)
SHMQueueDelete(&proclock->procLink);
proclock_hashcode = ProcLockHashCode(&proclock->tag, hashcode);
hash_search_with_hash_value(LockMethodProcLockHash, (void *) &(proclock->tag),
proclock_hashcode, HASH_REMOVE, NULL);
}
if (lock->nRequested == 0)
{
/*
* The caller just released the last lock, so garbage-collect the lock
* object.
*/
Assert(SHMQueueEmpty(&(lock->procLocks)));
hash_search(LockMethodLockHash, (void *) &(lock->tag), HASH_REMOVE, NULL);
}
/*
* If appropriate, awaken any waiters.
*/
if (wakeupNeeded)
{
ResProcLockRemoveSelfAndWakeup(lock);
}
return;
}
/*
* WaitOnResLock -- wait to acquire a resource lock.
*
*
* Warnings:
* It is expected that the partition lock is held before calling this
* function, as the various shared queue counts are inspected.
*/
static void
ResWaitOnLock(LOCALLOCK *locallock, ResourceOwner owner, ResPortalIncrement *incrementSet)
{
uint32 hashcode = locallock->hashcode;
LWLockId partitionLock = LockHashPartitionLock(hashcode);
char new_status[160];
const char *old_status;
int len;
/* Report change to waiting status */
if (update_process_title)
{
/* We should avoid using palloc() here */
old_status = get_real_act_ps_display(&len);
len = Min(len, sizeof(new_status) - 9);
snprintf(new_status, sizeof(new_status), "%.*s queuing",
len, old_status);
set_ps_display(new_status, false);
/* Truncate off " queuing" */
new_status[len] = '\0';
}
awaitedLock = locallock;
awaitedOwner = owner;
/*
* Now sleep.
*
* NOTE: self-deadlocks will throw (do a non-local return).
*/
if (ResProcSleep(ExclusiveLock, locallock, incrementSet) != STATUS_OK)
{
/*
* We failed as a result of a deadlock, see CheckDeadLock(). Quit now.
*/
LWLockRelease(partitionLock);
DeadLockReport();
}
awaitedLock = NULL;
/* Report change to non-waiting status */
if (update_process_title)
{
set_ps_display(new_status, false);
}
return;
}
/*
* ResProcLockRemoveSelfAndWakeup -- awaken any processses waiting on a resource lock.
*
* Notes:
* It always remove itself from the waitlist.
* Need to only awaken enough as many waiters as the resource controlled by
* the the lock should allow!
*/
void
ResProcLockRemoveSelfAndWakeup(LOCK *lock)
{
PROC_QUEUE *waitQueue = &(lock->waitProcs);
int queue_size = waitQueue->size;
PGPROC *proc;
uint32 hashcode;
LWLockId partitionLock;
int status;
Assert(LWLockHeldByMeInMode(ResQueueLock, LW_EXCLUSIVE));
/*
* XXX: This code is ugly and hard to read -- it should be a lot simpler,
* especially when there are some odd cases (process sitting on its own
* wait-queue).
*/
Assert(queue_size >= 0);
if (queue_size == 0)
{
return;
}
proc = (PGPROC *) waitQueue->links.next;
while (queue_size-- > 0)
{
/*
* Get the portal we are waiting on, and then its set of increments.
*/
ResPortalTag portalTag;
ResPortalIncrement *incrementSet;
/* Our own process may be on our wait-queue! */
if (proc->pid == MyProc->pid)
{
PGPROC *nextproc;
nextproc = (PGPROC *) proc->links.next;
SHMQueueDelete(&(proc->links));
(proc->waitLock->waitProcs.size)--;
proc = nextproc;
continue;
}
MemSet(&portalTag, 0, sizeof(ResPortalTag));
portalTag.pid = proc->pid;
portalTag.portalId = proc->waitPortalId;
incrementSet = ResIncrementFind(&portalTag);
if (!incrementSet)
{
hashcode = LockTagHashCode(&(lock->tag));
partitionLock = LockHashPartitionLock(hashcode);
LWLockRelease(partitionLock);
elog(ERROR, "no increment data for portal id %u and pid %d", proc->waitPortalId, proc->pid);
}
/*
* See if it is ok to wake this guy. (note that the wakeup writes to
* the wait list, and gives back a *new* next proc).
*/
status = ResLockCheckLimit(lock, proc->waitProcLock, incrementSet, true);
if (status == STATUS_OK)
{
ResGrantLock(lock, proc->waitProcLock);
ResLockUpdateLimit(lock, proc->waitProcLock, incrementSet, true, false);
proc = ResProcWakeup(proc, STATUS_OK);
}
else
{
/* Otherwise move on to the next guy. */
proc = (PGPROC *) proc->links.next;
}
}
Assert(waitQueue->size >= 0);
return;
}
/*
* ResProcWakeup -- wake a sleeping process.
*
* (could we just use ProcWakeup here?)
*/
PGPROC *
ResProcWakeup(PGPROC *proc, int waitStatus)
{
PGPROC *retProc;
/* Proc should be sleeping ... */
if (proc->links.prev == NULL ||
proc->links.next == NULL)
return NULL;
/* Save next process before we zap the list link */
retProc = (PGPROC *) proc->links.next;
/* Remove process from wait queue */
SHMQueueDelete(&(proc->links));
(proc->waitLock->waitProcs.size)--;
/* Clean up process' state and pass it the ok/fail signal */
proc->waitLock = NULL;
proc->waitProcLock = NULL;
proc->waitStatus = waitStatus;
/* And awaken it */
SetLatch(&proc->procLatch);
return retProc;
}
/*
* ResRemoveFromWaitQueue -- Remove a process from the wait queue, cleaning up
* any locks.
*/
void
ResRemoveFromWaitQueue(PGPROC *proc, uint32 hashcode)
{
LOCK *waitLock = proc->waitLock;
PROCLOCK *proclock = proc->waitProcLock;
LOCKMODE lockmode = proc->waitLockMode;
#ifdef USE_ASSERT_CHECKING
LOCKMETHODID lockmethodid = LOCK_LOCKMETHOD(*waitLock);
#endif /* USE_ASSERT_CHECKING */
ResPortalTag portalTag;
/* Make sure lockmethod is for a resource lock. */
Assert(lockmethodid == RESOURCE_LOCKMETHOD);
/* Make sure proc is waiting */
Assert(proc->links.next != NULL);
Assert(waitLock);
Assert(waitLock->waitProcs.size > 0);
/* Remove proc from lock's wait queue */
SHMQueueDelete(&(proc->links));
waitLock->waitProcs.size--;
/* Undo increments of request counts by waiting process */
Assert(waitLock->nRequested > 0);
Assert(waitLock->nRequested > proc->waitLock->nGranted);
waitLock->nRequested--;
Assert(waitLock->requested[lockmode] > 0);
waitLock->requested[lockmode]--;
/* don't forget to clear waitMask bit if appropriate */
if (waitLock->granted[lockmode] == waitLock->requested[lockmode])
waitLock->waitMask &= LOCKBIT_OFF(lockmode);
/* Clean up the proc's own state */
proc->waitLock = NULL;
proc->waitProcLock = NULL;
proc->waitStatus = STATUS_ERROR;
/*
* Remove the waited on portal increment.
*/
MemSet(&portalTag, 0, sizeof(ResPortalTag));
portalTag.pid = MyProc->pid;
portalTag.portalId = MyProc->waitPortalId;
LWLockAcquire(ResQueueLock, LW_EXCLUSIVE);
ResIncrementRemove(&portalTag);
/*
* Delete the proclock immediately if it represents no already-held locks.
* (This must happen now because if the owner of the lock decides to
* release it, and the requested/granted counts then go to zero,
* LockRelease expects there to be no remaining proclocks.) Then see if
* any other waiters for the lock can be woken up now.
*/
ResCleanUpLock(waitLock, proclock, hashcode, true);
LWLockRelease(ResQueueLock);
}
/*
* ResCheckSelfDeadLock -- Check to see if I am going to deadlock myself.
*
* What happens here is we scan our own set of portals and total up the
* increments. If this exceeds any of the thresholds for the queue then
* we need to signal that a self deadlock is about to occurr - modulo some
* footwork for overcommit-able queues.
*/
bool
ResCheckSelfDeadLock(LOCK *lock, PROCLOCK *proclock, ResPortalIncrement *incrementSet)
{
ResQueue queue;
ResLimit limits;
int i;
Cost incrementTotals[NUM_RES_LIMIT_TYPES];
int numPortals = 0;
bool countThesholdOvercommitted = false;
bool costThesholdOvercommitted = false;
bool memoryThesholdOvercommitted = false;
bool result = false;
/* Get the resource queue lock before checking the increments. */
LWLockAcquire(ResQueueLock, LW_EXCLUSIVE);
/* Get the queue for this lock. */
queue = GetResQueueFromLock(lock);
limits = queue->limits;
/* Get the increment totals and number of portals for this queue. */
TotalResPortalIncrements(MyProc->pid, queue->queueid,
incrementTotals, &numPortals);
/*
* Now check them against the thresholds using the same logic as
* ResLockCheckLimit.
*/
for (i = 0; i < NUM_RES_LIMIT_TYPES; i++)
{
if (limits[i].threshold_value == INVALID_RES_LIMIT_THRESHOLD)
{
continue;
}
switch (limits[i].type)
{
case RES_COUNT_LIMIT:
{
if (incrementTotals[i] > limits[i].threshold_value)
{
countThesholdOvercommitted = true;
}
}
break;
case RES_COST_LIMIT:
{
if (incrementTotals[i] > limits[i].threshold_value)
{
costThesholdOvercommitted = true;
}
}
break;
case RES_MEMORY_LIMIT:
{
if (incrementTotals[i] > limits[i].threshold_value)
{
memoryThesholdOvercommitted = true;
}
}
break;
}
}
/* If any threshold is overcommitted then set the result. */
if (countThesholdOvercommitted || costThesholdOvercommitted || memoryThesholdOvercommitted)
{
result = true;
}
/*
* If the queue can be overcommited and we are overcommitting with 1
* portal and *not* overcommitting the count threshold then don't trigger
* a self deadlock.
*/
if (queue->overcommit && numPortals == 1 && !countThesholdOvercommitted)
{
result = false;
}
if (result)
{
/*
* We're about to abort out of a partially completed lock acquisition.
*
* In order to allow our ref-counts to figure out how to clean things
* up we're going to "grant" the lock, which will immediately be
* cleaned up when our caller throws an ERROR.
*/
if (lock->nRequested > lock->nGranted)
{
/* we're no longer waiting. */
pgstat_report_wait_end();
ResGrantLock(lock, proclock);
ResLockUpdateLimit(lock, proclock, incrementSet, true, true);
}
/* our caller will throw an ERROR. */
}
LWLockRelease(ResQueueLock);
return result;
}
/*
* ResPortalIncrementHashTableInit - Initialize the increment hash.
*
* Notes:
* This stores the possible increments that a given statement will cause to
* be added to the limits for a resource queue.
* We allocate one extra slot for each backend, to free us from counting
* un-named portals.
*/
bool
ResPortalIncrementHashTableInit(void)
{
HASHCTL info;
long max_table_size = (MaxResourcePortalsPerXact + 1) * MaxBackends;
int hash_flags;
/* Set key and entry sizes. */
MemSet(&info, 0, sizeof(info));
info.keysize = sizeof(ResPortalTag);
info.entrysize = sizeof(ResPortalIncrement);
info.hash = tag_hash;
hash_flags = (HASH_ELEM | HASH_FUNCTION);
ResPortalIncrementHash = ShmemInitHash("Portal Increment Hash",
max_table_size / 2,
max_table_size,
&info,
hash_flags);
if (!ResPortalIncrementHash)
{
return false;
}
return true;
}
/*
* ResIncrementAdd -- Add a new increment element to the increment hash.
*
* We return the increment added. We return NULL if we are run out of shared
* memory. In case there is an existing increment element in the hash table,
* we have encountered a duplicate portal - so we return the existing increment
* for ERROR reporting purposes. The status output argument is updated to
* indicate the outcome of the routine.
*
* The resource queue lightweight lock (ResQueueLock) *must* be held for
* this operation.
*/
static ResPortalIncrement *
ResIncrementAdd(ResPortalIncrement *incSet,
PROCLOCK *proclock,
ResourceOwner owner,
ResIncrementAddStatus *status)
{
ResPortalIncrement *incrementSet;
ResPortalTag portaltag;
int i;
bool found;
Assert(LWLockHeldByMeInMode(ResQueueLock, LW_EXCLUSIVE));
#ifdef FAULT_INJECTOR
/* Simulate an out-of-shared-memory error by bypassing the increment hash. */
if (FaultInjector_InjectFaultIfSet("res_increment_add_oosm",
DDLNotSpecified,
"",
"") == FaultInjectorTypeSkip)
{
*status = RES_INCREMENT_ADD_OOSM;
return NULL;
}
#endif
/* Set up the key. */
MemSet(&portaltag, 0, sizeof(ResPortalTag));
portaltag.pid = incSet->pid;
portaltag.portalId = incSet->portalId;
/* Add (or find) the value. */
incrementSet = (ResPortalIncrement *)
hash_search(ResPortalIncrementHash, (void *) &portaltag, HASH_ENTER_NULL, &found);
if (!incrementSet)
{
*status = RES_INCREMENT_ADD_OOSM;
return NULL;
}
/* Initialize it. */
if (!found)
{
incrementSet->pid = incSet->pid;
incrementSet->portalId = incSet->portalId;
incrementSet->isHold = incSet->isHold;
incrementSet->isCommitted = false;
for (i = 0; i < NUM_RES_LIMIT_TYPES; i++)
{
incrementSet->increments[i] = incSet->increments[i];
}
SHMQueueInsertBefore(&proclock->portalLinks, &incrementSet->portalLink);
}
else
{
/* We have added this portId before - something has gone wrong! */
ResIncrementRemove(&portaltag);
*status = RES_INCREMENT_ADD_DUPLICATE_PORTAL;
return incrementSet;
}
*status = RES_INCREMENT_ADD_OK;
return incrementSet;
}
/*
* ResIncrementFind -- Find the increment for a portal and process.
*
* Notes
* Return a pointer to where the new increment is stored (NULL if not found).
*
* The resource queue lightweight lock (ResQueueLock) *must* be held for
* this operation.
*/
ResPortalIncrement *
ResIncrementFind(ResPortalTag *portaltag)
{
ResPortalIncrement *incrementSet;
bool found;
Assert(LWLockHeldByMeInMode(ResQueueLock, LW_EXCLUSIVE));
incrementSet = (ResPortalIncrement *)
hash_search(ResPortalIncrementHash, (void *) portaltag, HASH_FIND, &found);
if (!incrementSet)
{
return NULL;
}
return incrementSet;
}
/*
* ResIncrementRemove -- Remove a increment for a portal and process.
*
* Notes
* The resource queue lightweight lock (ResQueueLock) *must* be held for
* this operation.
*/
static bool
ResIncrementRemove(ResPortalTag *portaltag)
{
ResPortalIncrement *incrementSet;
bool found;
Assert(LWLockHeldByMeInMode(ResQueueLock, LW_EXCLUSIVE));
incrementSet = (ResPortalIncrement *)
hash_search(ResPortalIncrementHash, (void *) portaltag, HASH_REMOVE, &found);
if (incrementSet == NULL)
{
return false;
}
SHMQueueDelete(&incrementSet->portalLink);
return true;
}
/*
* ResQueueHashTableInit -- initialize the hash table of resource queues.
*
* Notes:
*/
bool
ResQueueHashTableInit(void)
{
HASHCTL info;
int hash_flags;
/* Set key and entry sizes. */
MemSet(&info, 0, sizeof(info));
info.keysize = sizeof(Oid);
info.entrysize = sizeof(ResQueueData);
info.hash = tag_hash;
hash_flags = (HASH_ELEM | HASH_FUNCTION);
#ifdef RESLOCK_DEBUG
elog(DEBUG1, "Creating hash table for %d queues", MaxResourceQueues);
#endif
ResQueueHash = ShmemInitHash("Queue Hash",
MaxResourceQueues,
MaxResourceQueues,
&info,
hash_flags);
if (!ResQueueHash)
return false;
return true;
}
/*
* ResQueuehashNew -- return a new (empty) queue object to initialize.
*
* Notes
* The resource queue lightweight lock (ResQueueLock) *must* be held for
* this operation.
*/
ResQueue
ResQueueHashNew(Oid queueid)
{
bool found;
ResQueueData *queue;
Assert(LWLockHeldByMeInMode(ResQueueLock, LW_EXCLUSIVE));
queue = (ResQueueData *)
hash_search(ResQueueHash, (void *) &queueid, HASH_ENTER_NULL, &found);
/* caller should test that the queue does not exist already */
Assert(!found);
if (!queue)
return NULL;
return (ResQueue) queue;
}
/*
* ResQueueHashFind -- return the queue for a given oid.
*
* Notes
* The resource queue lightweight lock (ResQueueLock) *must* be held for
* this operation.
*/
ResQueue
ResQueueHashFind(Oid queueid)
{
bool found;
ResQueueData *queue;
Assert(LWLockHeldByMeInMode(ResQueueLock, LW_EXCLUSIVE));
queue = (ResQueueData *)
hash_search(ResQueueHash, (void *) &queueid, HASH_FIND, &found);
if (!queue)
return NULL;
return (ResQueue) queue;
}
/*
* ResQueueHashRemove -- remove the queue for a given oid.
*
* Notes
* The resource queue lightweight lock (ResQueueLock) *must* be held for
* this operation.
*/
bool
ResQueueHashRemove(Oid queueid)
{
bool found;
void *queue;
Assert(LWLockHeldByMeInMode(ResQueueLock, LW_EXCLUSIVE));
queue = hash_search(ResQueueHash, (void *) &queueid, HASH_REMOVE, &found);
if (!queue)
return false;
return true;
}
/* Number of columns produced by pg_resqueue_status() */
#define PG_RESQUEUE_STATUS_COLUMNS 5
/*
* pg_resqueue_status - produce a view with one row per resource queue
* showing internal information (counter values, waiters, holders).
*/
Datum
pg_resqueue_status(PG_FUNCTION_ARGS)
{
FuncCallContext *funcctx = NULL;
Datum result;
MemoryContext oldcontext = NULL;
QueueStatusContext *fctx = NULL; /* User function context. */
HeapTuple tuple = NULL;
if (SRF_IS_FIRSTCALL())
{
funcctx = SRF_FIRSTCALL_INIT();
/* Switch context when allocating stuff to be used in later calls */
oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
fctx = (QueueStatusContext *) palloc(sizeof(QueueStatusContext));
/*
* Allocate space for the per-call area - this overestimates, but
* means we can take the resource rescheduler lock after our memory
* context switching.
*/
fctx->record = (QueueStatusRec *) palloc(sizeof(QueueStatusRec) * MaxResourceQueues);
funcctx->user_fctx = fctx;
/* Construct a tuple descriptor for the result rows. */
TupleDesc tupledesc = CreateTemplateTupleDesc(PG_RESQUEUE_STATUS_COLUMNS);
TupleDescInitEntry(tupledesc, (AttrNumber) 1, "queueid", OIDOID, -1, 0);
TupleDescInitEntry(tupledesc, (AttrNumber) 2, "queuecountvalue", FLOAT4OID, -1, 0);
TupleDescInitEntry(tupledesc, (AttrNumber) 3, "queuecostvalue", FLOAT4OID, -1, 0);
TupleDescInitEntry(tupledesc, (AttrNumber) 4, "queuewaiters", INT4OID, -1, 0);
TupleDescInitEntry(tupledesc, (AttrNumber) 5, "queueholders", INT4OID, -1, 0);
funcctx->tuple_desc = BlessTupleDesc(tupledesc);
/* Return to original context when allocating transient memory */
MemoryContextSwitchTo(oldcontext);
if (IsResQueueEnabled())
{
/* Get a snapshot of current state of resource queues */
BuildQueueStatusContext(fctx);
funcctx->max_calls = fctx->numRecords;
}
else
{
funcctx->max_calls = fctx->numRecords = 0;
}
}
funcctx = SRF_PERCALL_SETUP();
/* Get the saved state. */
fctx = funcctx->user_fctx;
if (funcctx->call_cntr < funcctx->max_calls)
{
int i = funcctx->call_cntr;
QueueStatusRec *record = &fctx->record[i];
Datum values[PG_RESQUEUE_STATUS_COLUMNS];
bool nulls[PG_RESQUEUE_STATUS_COLUMNS];
values[0] = ObjectIdGetDatum(record->queueid);
nulls[0] = false;
/* Make the counters null if the limit is disbaled. */
if (record->queuecountthreshold != INVALID_RES_LIMIT_THRESHOLD)
{
values[1] = Float4GetDatum(record->queuecountvalue);
nulls[1] = false;
}
else
nulls[1] = true;
if (record->queuecostthreshold != INVALID_RES_LIMIT_THRESHOLD)
{
values[2] = Float4GetDatum(record->queuecostvalue);
nulls[2] = false;
}
else
nulls[2] = true;
values[3] = record->queuewaiters;
nulls[3] = false;
values[4] = record->queueholders;
nulls[4] = false;
/* Build and return the tuple. */
tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
result = HeapTupleGetDatum(tuple);
SRF_RETURN_NEXT(funcctx, result);
}
else
SRF_RETURN_DONE(funcctx);
}
/**
* This copies out the current state of resource queues.
*/
static void
BuildQueueStatusContext(QueueStatusContext *fctx)
{
int num_calls = 0;
int numRecords;
int i;
HASH_SEQ_STATUS status;
ResQueueData *queue = NULL;
Assert(fctx);
Assert(fctx->record);
/*
* Take all the partition locks. This is necessary as we want to to use
* the same lock order as the rest of the code - i.e. partition locks
* *first* *then* the queue lock (otherwise we could deadlock ourselves).
*/
for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
LWLockAcquire(LockHashPartitionLockByIndex(i), LW_EXCLUSIVE);
/*
* Lock resource queue structures.
*/
LWLockAcquire(ResQueueLock, LW_EXCLUSIVE);
/* Initialize for a sequential scan of the resource queue hash. */
hash_seq_init(&status, ResQueueHash);
num_calls = hash_get_num_entries(ResQueueHash);
Assert(num_calls == ResScheduler->num_queues);
numRecords = 0;
while ((queue = (ResQueueData *) hash_seq_search(&status)) != NULL)
{
QueueStatusRec *record = &fctx->record[numRecords];
int j;
ResLimit limits = NULL;
uint32 hashcode;
/**
* Gather thresholds and current values on activestatements, cost and memory
*/
limits = queue->limits;
record->queueid = queue->queueid;
for (j = 0; j < NUM_RES_LIMIT_TYPES; j++)
{
switch (limits[j].type)
{
case RES_COUNT_LIMIT:
record->queuecountthreshold = limits[j].threshold_value;
record->queuecountvalue = limits[j].current_value;
break;
case RES_COST_LIMIT:
record->queuecostthreshold = limits[j].threshold_value;
record->queuecostvalue = limits[j].current_value;
break;
case RES_MEMORY_LIMIT:
record->queuememthreshold = limits[j].threshold_value;
record->queuememvalue =limits[j].current_value;
break;
default:
elog(ERROR, "unrecognized resource queue limit type: %d", limits[j].type);
}
}
/*
* Get the holders and waiters count for the corresponding resource
* lock.
*/
LOCKTAG tag;
LOCK *lock;
SET_LOCKTAG_RESOURCE_QUEUE(tag, queue->queueid);
hashcode = LockTagHashCode(&tag);
bool found = false;
lock = (LOCK *)
hash_search_with_hash_value(LockMethodLockHash, (void *) &tag, hashcode, HASH_FIND, &found);
if (!found || !lock)
{
record->queuewaiters = 0;
record->queueholders = 0;
}
else
{
record->queuewaiters = lock->nRequested - lock->nGranted;
record->queueholders = lock->nGranted;
}
numRecords++;
Assert(numRecords <= MaxResourceQueues);
}
/* Release the resource scheduler lock. */
LWLockRelease(ResQueueLock);
/* ...and the partition locks. */
for (i = NUM_LOCK_PARTITIONS; --i >= 0;)
LWLockRelease(LockHashPartitionLockByIndex(i));
/* Set the real no. of calls as we know it now! */
fctx->numRecords = numRecords;
return;
}
/* Number of records produced per queue. */
#define PG_RESQUEUE_STATUS_KV_RECORDS_PER_QUEUE 8
/* Number of columns produced by function */
#define PG_RESQUEUE_STATUS_KV_COLUMNS 3
/* Scratch space used to write out strings */
#define PG_RESQUEUE_STATUS_KV_BUFSIZE 256
/*
* pg_resqueue_status_extended - outputs the current state of resource queues in the following format:
* (queueid, key, value) where key and value are text. This makes the function extremely flexible.
*/
Datum
pg_resqueue_status_kv(PG_FUNCTION_ARGS)
{
FuncCallContext *funcctx = NULL;
Datum result;
MemoryContext oldcontext = NULL;
QueueStatusContext *fctx = NULL; /* User function context. */
HeapTuple tuple = NULL;
if (SRF_IS_FIRSTCALL())
{
funcctx = SRF_FIRSTCALL_INIT();
/* Switch context when allocating stuff to be used in later calls */
oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
fctx = (QueueStatusContext *) palloc(sizeof(QueueStatusContext));
/*
* Allocate space for the per-call area - this overestimates, but
* means we can take the resource rescheduler lock after our memory
* context switching.
*/
fctx->record = (QueueStatusRec *) palloc(sizeof(QueueStatusRec) * MaxResourceQueues);
funcctx->user_fctx = fctx;
/* Construct a tuple descriptor for the result rows. */
TupleDesc tupledesc = CreateTemplateTupleDesc(PG_RESQUEUE_STATUS_KV_COLUMNS);
TupleDescInitEntry(tupledesc, (AttrNumber) 1, "queueid", OIDOID, -1, 0);
TupleDescInitEntry(tupledesc, (AttrNumber) 2, "key", TEXTOID, -1, 0);
TupleDescInitEntry(tupledesc, (AttrNumber) 3, "value", TEXTOID, -1, 0);
funcctx->tuple_desc = BlessTupleDesc(tupledesc);
/* Return to original context when allocating transient memory */
MemoryContextSwitchTo(oldcontext);
if (IsResQueueEnabled())
{
/* Get a snapshot of current state of resource queues */
BuildQueueStatusContext(fctx);
funcctx->max_calls = fctx->numRecords * PG_RESQUEUE_STATUS_KV_RECORDS_PER_QUEUE;
}
else
{
funcctx->max_calls = fctx->numRecords = 0;
}
}
funcctx = SRF_PERCALL_SETUP();
/* Get the saved state. */
fctx = funcctx->user_fctx;
if (funcctx->call_cntr < funcctx->max_calls)
{
int i = funcctx->call_cntr / PG_RESQUEUE_STATUS_KV_RECORDS_PER_QUEUE; /* record number */
int j = funcctx->call_cntr % PG_RESQUEUE_STATUS_KV_RECORDS_PER_QUEUE; /* which attribute is
* being produced */
QueueStatusRec *record = &fctx->record[i];
Datum values[PG_RESQUEUE_STATUS_KV_COLUMNS];
bool nulls[PG_RESQUEUE_STATUS_KV_COLUMNS];
char buf[PG_RESQUEUE_STATUS_KV_BUFSIZE];
nulls[0] = false;
nulls[1] = false;
nulls[2] = false;
values[0] = ObjectIdGetDatum(record->queueid);
switch (j)
{
case 0:
values[1] = PointerGetDatum(cstring_to_text("rsqcountlimit"));
snprintf(buf, ARRAY_SIZE(buf), "%d", (int) ceil(record->queuecountthreshold));
values[2] = PointerGetDatum(cstring_to_text(buf));
break;
case 1:
values[1] = PointerGetDatum(cstring_to_text("rsqcountvalue"));
snprintf(buf, ARRAY_SIZE(buf), "%d", (int) ceil(record->queuecountvalue));
values[2] = PointerGetDatum(cstring_to_text(buf));
break;
case 2:
values[1] = PointerGetDatum(cstring_to_text("rsqcostlimit"));
snprintf(buf, ARRAY_SIZE(buf), "%.2f", record->queuecostthreshold);
values[2] = PointerGetDatum(cstring_to_text(buf));
break;
case 3:
values[1] = PointerGetDatum(cstring_to_text("rsqcostvalue"));
snprintf(buf, ARRAY_SIZE(buf), "%.2f", record->queuecostvalue);
values[2] = PointerGetDatum(cstring_to_text(buf));
break;
case 4:
values[1] = PointerGetDatum(cstring_to_text("rsqmemorylimit"));
snprintf(buf, ARRAY_SIZE(buf), "%.2f", record->queuememthreshold);
values[2] = PointerGetDatum(cstring_to_text(buf));
break;
case 5:
values[1] = PointerGetDatum(cstring_to_text("rsqmemoryvalue"));
snprintf(buf, ARRAY_SIZE(buf), "%.2f", record->queuememvalue);
values[2] = PointerGetDatum(cstring_to_text(buf));
break;
case 6:
values[1] = PointerGetDatum(cstring_to_text("rsqwaiters"));
snprintf(buf, ARRAY_SIZE(buf), "%d", record->queuewaiters);
values[2] = PointerGetDatum(cstring_to_text(buf));
break;
case 7:
values[1] = PointerGetDatum(cstring_to_text("rsqholders"));
snprintf(buf, ARRAY_SIZE(buf), "%d", record->queueholders);
values[2] = PointerGetDatum(cstring_to_text(buf));
break;
default:
Assert(false && "Cannot reach here");
}
/* Build and return the tuple. */
tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
result = HeapTupleGetDatum(tuple);
SRF_RETURN_NEXT(funcctx, result);
}
else
SRF_RETURN_DONE(funcctx);
}
/**
* What is the memory limit on a queue per the catalog in bytes. Returns -1 if not set.
*/
int64 ResourceQueueGetMemoryLimitInCatalog(Oid queueId)
{
int memoryLimitKB = -1;
Assert(queueId != InvalidOid);
List * capabilitiesList = GetResqueueCapabilityEntry(queueId); /* This is a list of lists */
ListCell *le = NULL;
foreach(le, capabilitiesList)
{
List *entry = NULL;
Value *key = NULL;
entry = (List *) lfirst(le);
Assert(entry);
key = (Value *) linitial(entry);
Assert(key->type == T_Integer); /* This is resource type id */
if (intVal(key) == PG_RESRCTYPE_MEMORY_LIMIT)
{
Value *val = lsecond(entry);
Assert(val->type == T_String);
#ifdef USE_ASSERT_CHECKING
bool result =
#else
(void)
#endif
parse_int(strVal(val), &memoryLimitKB, GUC_UNIT_KB, NULL);
Assert(result);
}
}
list_free(capabilitiesList);
Assert(memoryLimitKB == -1 || memoryLimitKB > 0);
if (memoryLimitKB == -1)
{
return (int64) -1;
}
return (int64) memoryLimitKB * 1024;
}
/**
* Get memory limit associated with queue in bytes.
* Returns -1 if a limit does not exist.
*/
int64 ResourceQueueGetMemoryLimit(Oid queueId)
{
int64 memoryLimitBytes = -1;
Assert(queueId != InvalidOid);
if (!IsResManagerMemoryPolicyNone())
{
memoryLimitBytes = ResourceQueueGetMemoryLimitInCatalog(queueId);
}
return memoryLimitBytes;
}
/**
* Given a queueid, how much memory should a query take in bytes.
*/
uint64 ResourceQueueGetQueryMemoryLimit(PlannedStmt *stmt, Oid queueId)
{
Assert(Gp_role == GP_ROLE_DISPATCH || Gp_role == GP_ROLE_UTILITY);
Assert(queueId != InvalidOid);
/* resource queue will not limit super user */
if (superuser())
return ResourceQueueGetSuperuserQueryMemoryLimit();
if (IsResManagerMemoryPolicyNone())
return 0;
/** Assert that I do not hold lwlock */
Assert(!LWLockHeldByMeInMode(ResQueueLock, LW_EXCLUSIVE));
int64 resqLimitBytes = ResourceQueueGetMemoryLimit(queueId);
/**
* If there is no memory limit on the queue, simply use statement_mem.
*/
AssertImply(resqLimitBytes < 0, resqLimitBytes == -1);
if (resqLimitBytes == -1)
{
return (uint64) statement_mem * 1024L;
}
/**
* This method should only be called while holding exclusive lock on ResourceQueues. This means
* that nobody can modify any resource queue while current process is performing this computation.
*/
LWLockAcquire(ResQueueLock, LW_EXCLUSIVE);
ResQueue resQueue = ResQueueHashFind(queueId);
LWLockRelease(ResQueueLock);
Assert(resQueue);
int numSlots = (int) ceil(resQueue->limits[RES_COUNT_LIMIT].threshold_value);
double costLimit = (double) resQueue->limits[RES_COST_LIMIT].threshold_value;
double planCost = stmt->planTree->total_cost;
if (planCost < 1.0)
planCost = 1.0;
Assert(planCost > 0.0);
if (LogResManagerMemory())
{
elog(GP_RESMANAGER_MEMORY_LOG_LEVEL, "numslots: %d, costlimit: %f", numSlots, costLimit);
}
if (numSlots < 1)
{
/** there is no statement limit set */
numSlots = 1;
}
if (costLimit < 0.0)
{
/** there is no cost limit set */
costLimit = planCost;
}
double minRatio = Min( 1.0/ (double) numSlots, planCost / costLimit);
minRatio = Min(minRatio, 1.0);
if (LogResManagerMemory())
{
elog(GP_RESMANAGER_MEMORY_LOG_LEVEL, "slotratio: %0.3f, costratio: %0.3f, minratio: %0.3f",
1.0/ (double) numSlots, planCost / costLimit, minRatio);
}
uint64 queryMem = (uint64) resqLimitBytes * minRatio;
/**
* If user requests more using statement_mem, grant that.
*/
if (queryMem < (uint64) statement_mem * 1024L)
{
queryMem = (uint64) statement_mem * 1024L;
}
return queryMem;
}
/**
* How much memory should superuser queries get?
*/
static uint64 ResourceQueueGetSuperuserQueryMemoryLimit(void)
{
Assert(superuser());
return (uint64) statement_mem * 1024L;
}
相关信息
相关文章
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦