greenplumn resgroup_helper 源码
greenplumn resgroup_helper 代码
文件路径:/src/backend/utils/resgroup/resgroup_helper.c
/*-------------------------------------------------------------------------
*
* Gp_resgroup_helper.c
* Helper functions for resource group.
*
* Copyright (c) 2017-Present VMware, Inc. or its affiliates.
*
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/genam.h"
#include "access/table.h"
#include "funcapi.h"
#include "libpq-fe.h"
#include "miscadmin.h"
#include "catalog/pg_resgroup.h"
#include "cdb/cdbdisp_query.h"
#include "cdb/cdbdispatchresult.h"
#include "cdb/cdbvars.h"
#include "commands/resgroupcmds.h"
#include "storage/procarray.h"
#include "utils/builtins.h"
#include "utils/datetime.h"
#include "utils/resgroup.h"
#include "utils/resgroup-ops.h"
#include "utils/resource_manager.h"
typedef struct ResGroupStat
{
Datum groupId;
StringInfo cpuUsage;
StringInfo memUsage;
} ResGroupStat;
typedef struct ResGroupStatCtx
{
int nGroups;
ResGroupStat groups[1];
} ResGroupStatCtx;
static void calcCpuUsage(StringInfoData *str,
int64 usageBegin, TimestampTz timestampBegin,
int64 usageEnd, TimestampTz timestampEnd);
static void getResUsage(ResGroupStatCtx *ctx, Oid inGroupId);
static void dumpResGroupInfo(StringInfo str);
static void
calcCpuUsage(StringInfoData *str,
int64 usageBegin, TimestampTz timestampBegin,
int64 usageEnd, TimestampTz timestampEnd)
{
int64 duration;
long secs;
int usecs;
int64 usage;
usage = usageEnd - usageBegin;
TimestampDifference(timestampBegin, timestampEnd, &secs, &usecs);
duration = secs * 1000000 + usecs;
appendStringInfo(str, "\"%d\":%.2f",
GpIdentity.segindex,
ResGroupOps_ConvertCpuUsageToPercent(usage, duration));
}
/*
* Get resource usage.
*
* On QD this function dispatch the request to all QEs, collecting both
* QEs' and QD's resource usage.
*
* On QE this function only collect the resource usage on itself.
*
* Memory & cpu usage are returned in JSON format.
*/
static void
getResUsage(ResGroupStatCtx *ctx, Oid inGroupId)
{
int64 *usages;
TimestampTz *timestamps;
int i, j;
usages = palloc(sizeof(*usages) * ctx->nGroups);
timestamps = palloc(sizeof(*timestamps) * ctx->nGroups);
for (j = 0; j < ctx->nGroups; j++)
{
ResGroupStat *row = &ctx->groups[j];
Oid groupId = DatumGetObjectId(row->groupId);
usages[j] = ResGroupOps_GetCpuUsage(groupId);
timestamps[j] = GetCurrentTimestamp();
}
if (Gp_role == GP_ROLE_DISPATCH)
{
CdbPgResults cdb_pgresults = {NULL, 0};
StringInfoData buffer;
initStringInfo(&buffer);
appendStringInfo(&buffer,
"SELECT groupid, cpu_usage, memory_usage "
"FROM pg_resgroup_get_status(%u)",
inGroupId);
CdbDispatchCommand(buffer.data, DF_WITH_SNAPSHOT, &cdb_pgresults);
if (cdb_pgresults.numResults == 0)
elog(ERROR, "pg_resgroup_get_status() didn't get back any resource statistic from the segDBs");
for (i = 0; i < cdb_pgresults.numResults; i++)
{
struct pg_result *pg_result = cdb_pgresults.pg_results[i];
/*
* Any error here should have propagated into errbuf, so we shouldn't
* ever see anything other that tuples_ok here. But, check to be
* sure.
*/
if (PQresultStatus(pg_result) != PGRES_TUPLES_OK)
{
cdbdisp_clearCdbPgResults(&cdb_pgresults);
elog(ERROR, "pg_resgroup_get_status(): resultStatus not tuples_Ok");
}
Assert(PQntuples(pg_result) == ctx->nGroups);
for (j = 0; j < ctx->nGroups; j++)
{
const char *result;
ResGroupStat *row = &ctx->groups[j];
Oid groupId = atooid(PQgetvalue(pg_result, j, 0));
Assert(groupId == row->groupId);
if (row->memUsage->len == 0)
{
Datum d = ResGroupGetStat(groupId, RES_GROUP_STAT_MEM_USAGE);
row->groupId = groupId;
appendStringInfo(row->memUsage, "{\"%d\":%s",
GpIdentity.segindex, DatumGetCString(d));
appendStringInfo(row->cpuUsage, "{");
calcCpuUsage(row->cpuUsage, usages[j], timestamps[j],
ResGroupOps_GetCpuUsage(groupId),
GetCurrentTimestamp());
}
result = PQgetvalue(pg_result, j, 1);
appendStringInfo(row->cpuUsage, ", %s", result);
result = PQgetvalue(pg_result, j, 2);
appendStringInfo(row->memUsage, ", %s", result);
if (i == cdb_pgresults.numResults - 1)
{
appendStringInfoChar(row->cpuUsage, '}');
appendStringInfoChar(row->memUsage, '}');
}
}
}
cdbdisp_clearCdbPgResults(&cdb_pgresults);
}
else
{
pg_usleep(300000);
for (j = 0; j < ctx->nGroups; j++)
{
ResGroupStat *row = &ctx->groups[j];
Oid groupId = DatumGetObjectId(row->groupId);
Datum d = ResGroupGetStat(groupId, RES_GROUP_STAT_MEM_USAGE);
appendStringInfo(row->memUsage, "\"%d\":%s",
GpIdentity.segindex, DatumGetCString(d));
calcCpuUsage(row->cpuUsage, usages[j], timestamps[j],
ResGroupOps_GetCpuUsage(groupId),
GetCurrentTimestamp());
}
}
}
static int
compareRow(const void *ptr1, const void *ptr2)
{
const ResGroupStat *row1 = (const ResGroupStat *) ptr1;
const ResGroupStat *row2 = (const ResGroupStat *) ptr2;
return row1->groupId - row2->groupId;
}
/*
* Get status of resource groups
*/
Datum
pg_resgroup_get_status(PG_FUNCTION_ARGS)
{
FuncCallContext *funcctx;
ResGroupStatCtx *ctx;
if (SRF_IS_FIRSTCALL())
{
MemoryContext oldcontext;
TupleDesc tupdesc;
int nattr = 8;
funcctx = SRF_FIRSTCALL_INIT();
oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
tupdesc = CreateTemplateTupleDesc(nattr);
TupleDescInitEntry(tupdesc, (AttrNumber) 1, "groupid", OIDOID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 2, "num_running", INT4OID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 3, "num_queueing", INT4OID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 4, "num_queued", INT4OID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 5, "num_executed", INT4OID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 6, "total_queue_duration", INTERVALOID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 7, "cpu_usage", JSONOID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 8, "memory_usage", JSONOID, -1, 0);
funcctx->tuple_desc = BlessTupleDesc(tupdesc);
if (IsResGroupActivated())
{
Relation pg_resgroup_rel;
SysScanDesc sscan;
HeapTuple tuple;
Oid inGroupId = PG_GETARG_OID(0);
int ctxsize = sizeof(ResGroupStatCtx) +
sizeof(ResGroupStat) * (MaxResourceGroups - 1);
funcctx->user_fctx = palloc(ctxsize);
ctx = (ResGroupStatCtx *) funcctx->user_fctx;
/*
* others may be creating/dropping resource group concurrently,
* block until creating/dropping finish to avoid inconsistent
* resource group metadata
*/
pg_resgroup_rel = table_open(ResGroupRelationId, ExclusiveLock);
sscan = systable_beginscan(pg_resgroup_rel, InvalidOid, false,
NULL, 0, NULL);
while (HeapTupleIsValid(tuple = systable_getnext(sscan)))
{
Oid oid = ((Form_pg_resgroup) GETSTRUCT(tuple))->oid;
if (inGroupId == InvalidOid || inGroupId == oid)
{
Assert(funcctx->max_calls < MaxResourceGroups);
ctx->groups[funcctx->max_calls].cpuUsage = makeStringInfo();
ctx->groups[funcctx->max_calls].memUsage = makeStringInfo();
ctx->groups[funcctx->max_calls++].groupId = oid;
if (inGroupId != InvalidOid)
break;
}
}
systable_endscan(sscan);
ctx->nGroups = funcctx->max_calls;
qsort(ctx->groups, ctx->nGroups, sizeof(ctx->groups[0]), compareRow);
if (ctx->nGroups > 0)
getResUsage(ctx, inGroupId);
table_close(pg_resgroup_rel, ExclusiveLock);
}
MemoryContextSwitchTo(oldcontext);
}
/* stuff done on every call of the function */
funcctx = SRF_PERCALL_SETUP();
ctx = (ResGroupStatCtx *) funcctx->user_fctx;
if (funcctx->call_cntr < funcctx->max_calls)
{
/* for each row */
Datum values[8];
bool nulls[8];
HeapTuple tuple;
Oid groupId;
char statVal[MAXDATELEN + 1];
ResGroupStat *row = &ctx->groups[funcctx->call_cntr];
MemSet(values, 0, sizeof(values));
MemSet(nulls, 0, sizeof(nulls));
MemSet(statVal, 0, sizeof(statVal));
values[0] = row->groupId;
groupId = DatumGetObjectId(values[0]);
if (Gp_role == GP_ROLE_UTILITY)
{
nulls[1] = true;
nulls[2] = true;
nulls[3] = true;
nulls[4] = true;
nulls[5] = true;
}
else
{
values[1] = ResGroupGetStat(groupId, RES_GROUP_STAT_NRUNNING);
values[2] = ResGroupGetStat(groupId, RES_GROUP_STAT_NQUEUEING);
values[3] = ResGroupGetStat(groupId, RES_GROUP_STAT_TOTAL_QUEUED);
values[4] = ResGroupGetStat(groupId, RES_GROUP_STAT_TOTAL_EXECUTED);
values[5] = ResGroupGetStat(groupId, RES_GROUP_STAT_TOTAL_QUEUE_TIME);
}
values[6] = CStringGetTextDatum(row->cpuUsage->data);
values[7] = CStringGetTextDatum(row->memUsage->data);
tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(tuple));
}
else
{
/* nothing left */
SRF_RETURN_DONE(funcctx);
}
}
/*
* Get status of resource groups in key-value style
*/
Datum
pg_resgroup_get_status_kv(PG_FUNCTION_ARGS)
{
FuncCallContext *funcctx;
StringInfoData str;
bool do_dump;
do_dump = (strncmp(text_to_cstring(PG_GETARG_TEXT_P(0)), "dump", 4) == 0);
if (do_dump)
{
/* Only super user can call this function with para=dump. */
if (!superuser())
{
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("only superusers can call this function")));
}
initStringInfo(&str);
/* dump info in QD and collect info from QEs to form str.*/
dumpResGroupInfo(&str);
}
if (SRF_IS_FIRSTCALL())
{
MemoryContext oldcontext;
TupleDesc tupdesc;
int nattr = 3;
funcctx = SRF_FIRSTCALL_INIT();
oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
tupdesc = CreateTemplateTupleDesc(nattr);
TupleDescInitEntry(tupdesc, (AttrNumber) 1, "groupid", OIDOID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 2, "prop", TEXTOID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 3, "value", TEXTOID, -1, 0);
funcctx->tuple_desc = BlessTupleDesc(tupdesc);
funcctx->max_calls = do_dump ? 1 : 0;
MemoryContextSwitchTo(oldcontext);
}
/* stuff done on every call of the function */
funcctx = SRF_PERCALL_SETUP();
if (funcctx->call_cntr < funcctx->max_calls)
{
if (do_dump)
{
Datum values[3];
bool nulls[3];
HeapTuple tuple;
MemSet(values, 0, sizeof(values));
MemSet(nulls, 0, sizeof(nulls));
nulls[0] = nulls[1] = true;
values[2] = CStringGetTextDatum(str.data);
tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(tuple));
}
else
{
SRF_RETURN_DONE(funcctx);
}
}
else
{
/* nothing left */
SRF_RETURN_DONE(funcctx);
}
}
static void
dumpResGroupInfo(StringInfo str)
{
if (Gp_role == GP_ROLE_DISPATCH)
{
int i;
StringInfoData str_qd;
StringInfoData buffer;
CdbPgResults cdb_pgresults = {NULL, 0};
struct pg_result *pg_result;
initStringInfo(&str_qd);
initStringInfo(&buffer);
appendStringInfo(&buffer,
"select * from pg_resgroup_get_status_kv('dump');");
CdbDispatchCommand(buffer.data, 0, &cdb_pgresults);
if (cdb_pgresults.numResults == 0)
elog(ERROR, "dumpResGroupInfo didn't get back any results from the segDBs");
LWLockAcquire(ResGroupLock, LW_EXCLUSIVE);
ResGroupDumpInfo(&str_qd);
LWLockRelease(ResGroupLock);
/* append all qes and qd together to form str */
appendStringInfo(str, "{\"info\":[%s,", str_qd.data);
for (i = 0; i < cdb_pgresults.numResults; i++)
{
pg_result = cdb_pgresults.pg_results[i];
if (PQresultStatus(pg_result) != PGRES_TUPLES_OK)
{
cdbdisp_clearCdbPgResults(&cdb_pgresults);
elog(ERROR, "pg_resgroup_get_status_kv(): resultStatus not tuples_Ok");
}
Assert(PQntuples(pg_result) == 1);
appendStringInfo(str, "%s", PQgetvalue(pg_result, 0, 2));
if (i < cdb_pgresults.numResults - 1)
appendStringInfo(str, ",");
}
appendStringInfo(str, "]}");
cdbdisp_clearCdbPgResults(&cdb_pgresults);
}
else
{
LWLockAcquire(ResGroupLock, LW_EXCLUSIVE);
ResGroupDumpInfo(str);
LWLockRelease(ResGroupLock);
}
}
Datum
pg_resgroup_check_move_query(PG_FUNCTION_ARGS)
{
TupleDesc tupdesc;
Datum values[2];
bool nulls[2];
HeapTuple htup;
int sessionId = PG_GETARG_INT32(0);
Oid groupId = PG_GETARG_OID(1);
int32 sessionMem = ResGroupGetSessionMemUsage(sessionId);
int32 availMem = ResGroupGetGroupAvailableMem(groupId);
if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
elog(ERROR, "return type must be a row type");
tupdesc = BlessTupleDesc(tupdesc);
MemSet(nulls, 0, sizeof(nulls));
values[0] = Int32GetDatum(sessionMem);
values[1] = Int32GetDatum(availMem);
htup = heap_form_tuple(tupdesc, values, nulls);
PG_RETURN_DATUM(HeapTupleGetDatum(htup));
}
/*
* move a query to a resource group
*/
Datum
pg_resgroup_move_query(PG_FUNCTION_ARGS)
{
int sessionId;
Oid groupId;
const char *groupName;
if (!IsResGroupEnabled())
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
(errmsg("resource group is not enabled"))));
if (!superuser())
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
(errmsg("must be superuser to move query"))));
if (Gp_role == GP_ROLE_DISPATCH)
{
Oid currentGroupId;
pid_t pid = PG_GETARG_INT32(0);
groupName = text_to_cstring(PG_GETARG_TEXT_PP(1));
groupId = get_resgroup_oid(groupName, false);
sessionId = GetSessionIdByPid(pid);
if (sessionId == -1)
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
(errmsg("cannot find process: %d", pid))));
currentGroupId = ResGroupGetGroupIdBySessionId(sessionId);
if (currentGroupId == InvalidOid)
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
(errmsg("process %d is in IDLE state", pid))));
if (currentGroupId == groupId)
PG_RETURN_BOOL(true);
ResGroupMoveQuery(sessionId, groupId, groupName);
}
else if (Gp_role == GP_ROLE_EXECUTE)
{
sessionId = PG_GETARG_INT32(0);
groupName = text_to_cstring(PG_GETARG_TEXT_PP(1));
groupId = get_resgroup_oid(groupName, false);
ResGroupSignalMoveQuery(sessionId, NULL, groupId);
}
PG_RETURN_BOOL(true);
}
相关信息
相关文章
greenplumn resgroup-ops-dummy 源码
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦