greenplumn aggregate 源码
greenplumn aggregate 代码
文件路径:/gpcontrib/orafce/aggregate.c
#include "postgres.h"
#include <math.h>
#include "funcapi.h"
#include "builtins.h"
#include "lib/stringinfo.h"
#include "utils/builtins.h"
#include "orafce.h"
PG_FUNCTION_INFO_V1(orafce_listagg1_transfn);
PG_FUNCTION_INFO_V1(orafce_wm_concat_transfn);
PG_FUNCTION_INFO_V1(orafce_listagg2_transfn);
PG_FUNCTION_INFO_V1(orafce_listagg_finalfn);
PG_FUNCTION_INFO_V1(orafce_median4_transfn);
PG_FUNCTION_INFO_V1(orafce_median4_finalfn);
PG_FUNCTION_INFO_V1(orafce_median8_transfn);
PG_FUNCTION_INFO_V1(orafce_median8_finalfn);
typedef struct
{
int alen; /* allocated length */
int nextlen; /* next allocated length */
int nelems; /* number of valid entries */
union
{
float4 *float4_values;
float8 *float8_values;
} d;
} MedianState;
int orafce_float4_cmp(const void *a, const void *b);
int orafce_float8_cmp(const void *a, const void *b);
/****************************************************************
* listagg
*
* Concates values and returns string.
*
* Syntax:
* FUNCTION listagg(string varchar, delimiter varchar = '')
* RETURNS varchar;
*
* Note: any NULL value is ignored.
*
****************************************************************/
/* subroutine to initialize state */
static StringInfo
makeStringAggState(FunctionCallInfo fcinfo)
{
StringInfo state;
MemoryContext aggcontext;
MemoryContext oldcontext;
if (!AggCheckCallContext(fcinfo, &aggcontext))
{
/* cannot be called directly because of internal-type argument */
elog(ERROR, "listagg_transfn called in non-aggregate context");
}
/*
* Create state in aggregate context. It'll stay there across subsequent
* calls.
*/
oldcontext = MemoryContextSwitchTo(aggcontext);
state = makeStringInfo();
MemoryContextSwitchTo(oldcontext);
return state;
}
static void
appendStringInfoText(StringInfo str, const text *t)
{
appendBinaryStringInfo(str, VARDATA_ANY(t), VARSIZE_ANY_EXHDR(t));
}
Datum
orafce_listagg1_transfn(PG_FUNCTION_ARGS)
{
StringInfo state;
state = PG_ARGISNULL(0) ? NULL : (StringInfo) PG_GETARG_POINTER(0);
/* Append the element unless null. */
if (!PG_ARGISNULL(1))
{
if (state == NULL)
state = makeStringAggState(fcinfo);
appendStringInfoText(state, PG_GETARG_TEXT_PP(1)); /* value */
}
/*
* The transition type for string_agg() is declared to be "internal",
* which is a pass-by-value type the same size as a pointer.
*/
PG_RETURN_POINTER(state);
}
Datum
orafce_wm_concat_transfn(PG_FUNCTION_ARGS)
{
StringInfo state;
state = PG_ARGISNULL(0) ? NULL : (StringInfo) PG_GETARG_POINTER(0);
/* Append the element unless null. */
if (!PG_ARGISNULL(1))
{
if (state == NULL)
state = makeStringAggState(fcinfo);
else
appendStringInfoChar(state, ',');
appendStringInfoText(state, PG_GETARG_TEXT_PP(1)); /* value */
}
/*
* The transition type for string_agg() is declared to be "internal",
* which is a pass-by-value type the same size as a pointer.
*/
PG_RETURN_POINTER(state);
}
Datum
orafce_listagg2_transfn(PG_FUNCTION_ARGS)
{
return string_agg_transfn(fcinfo);
}
Datum
orafce_listagg_finalfn(PG_FUNCTION_ARGS)
{
return string_agg_finalfn(fcinfo);
}
static MedianState *
accumFloat4(MedianState *mstate, float4 value, MemoryContext aggcontext)
{
MemoryContext oldcontext;
if (mstate == NULL)
{
/* First call - initialize */
oldcontext = MemoryContextSwitchTo(aggcontext);
mstate = palloc(sizeof(MedianState));
mstate->alen = 1024;
mstate->nextlen = 2 * 1024;
mstate->nelems = 0;
mstate->d.float4_values = palloc(mstate->alen * sizeof(float4));
MemoryContextSwitchTo(oldcontext);
}
else
{
/* enlarge float4_values if needed */
if (mstate->nelems >= mstate->alen)
{
int newlen = mstate->nextlen;
oldcontext = MemoryContextSwitchTo(aggcontext);
mstate->nextlen += mstate->alen;
mstate->alen = newlen;
mstate->d.float4_values = repalloc(mstate->d.float4_values,
mstate->alen * sizeof(float4));
MemoryContextSwitchTo(oldcontext);
}
}
mstate->d.float4_values[mstate->nelems++] = value;
return mstate;
}
static MedianState *
accumFloat8(MedianState *mstate, float8 value, MemoryContext aggcontext)
{
MemoryContext oldcontext;
if (mstate == NULL)
{
/* First call - initialize */
oldcontext = MemoryContextSwitchTo(aggcontext);
mstate = palloc(sizeof(MedianState));
mstate->alen = 1024;
mstate->nextlen = 2 * 1024;
mstate->nelems = 0;
mstate->d.float8_values = palloc(mstate->alen * sizeof(float8));
MemoryContextSwitchTo(oldcontext);
}
else
{
/* enlarge float4_values if needed */
if (mstate->nelems >= mstate->alen)
{
int newlen = mstate->nextlen;
oldcontext = MemoryContextSwitchTo(aggcontext);
mstate->nextlen += mstate->alen;
mstate->alen = newlen;
mstate->d.float8_values = repalloc(mstate->d.float8_values,
mstate->alen * sizeof(float8));
MemoryContextSwitchTo(oldcontext);
}
}
mstate->d.float8_values[mstate->nelems++] = value;
return mstate;
}
Datum
orafce_median4_transfn(PG_FUNCTION_ARGS)
{
MemoryContext aggcontext;
MedianState *state = NULL;
float4 elem;
if (!AggCheckCallContext(fcinfo, &aggcontext))
{
/* cannot be called directly because of internal-type argument */
elog(ERROR, "median4_transfn called in non-aggregate context");
}
state = PG_ARGISNULL(0) ? NULL : (MedianState *) PG_GETARG_POINTER(0);
if (PG_ARGISNULL(1))
PG_RETURN_POINTER(state);
elem = PG_GETARG_FLOAT4(1);
state = accumFloat4(state, elem, aggcontext);
PG_RETURN_POINTER(state);
}
int
orafce_float4_cmp(const void *_a, const void *_b)
{
float4 a = *((float4 *) _a);
float4 b = *((float4 *) _b);
if (isnan(a))
{
if (isnan(b))
return 0;
else
return 1;
}
else if (isnan(b))
{
return -1;
}
else
{
if (a > b)
return 1;
else if (a < b)
return -1;
else
return 0;
}
}
Datum
orafce_median4_finalfn(PG_FUNCTION_ARGS)
{
MedianState *state = NULL;
int lidx;
int hidx;
float4 result;
if (PG_ARGISNULL(0))
PG_RETURN_NULL();
state = (MedianState *) PG_GETARG_POINTER(0);
if (state == NULL)
PG_RETURN_NULL();
qsort(state->d.float4_values, state->nelems, sizeof(float4), orafce_float4_cmp);
lidx = state->nelems / 2 + 1 - 1;
hidx = (state->nelems + 1) / 2 - 1;
if (lidx == hidx)
result = state->d.float4_values[lidx];
else
result = (state->d.float4_values[lidx] + state->d.float4_values[hidx]) / 2.0f;
PG_RETURN_FLOAT4(result);
}
Datum
orafce_median8_transfn(PG_FUNCTION_ARGS)
{
MemoryContext aggcontext;
MedianState *state = NULL;
float8 elem;
if (!AggCheckCallContext(fcinfo, &aggcontext))
{
/* cannot be called directly because of internal-type argument */
elog(ERROR, "median4_transfn called in non-aggregate context");
}
state = PG_ARGISNULL(0) ? NULL : (MedianState *) PG_GETARG_POINTER(0);
if (PG_ARGISNULL(1))
PG_RETURN_POINTER(state);
elem = PG_GETARG_FLOAT8(1);
state = accumFloat8(state, elem, aggcontext);
PG_RETURN_POINTER(state);
}
int
orafce_float8_cmp(const void *_a, const void *_b)
{
float8 a = *((float8 *) _a);
float8 b = *((float8 *) _b);
if (isnan(a))
{
if (isnan(b))
return 0;
else
return 1;
}
else if (isnan(b))
{
return -1;
}
else
{
if (a > b)
return 1;
else if (a < b)
return -1;
else
return 0;
}
}
Datum
orafce_median8_finalfn(PG_FUNCTION_ARGS)
{
MedianState *state = NULL;
int lidx;
int hidx;
float8 result;
if (PG_ARGISNULL(0))
PG_RETURN_NULL();
state = (MedianState *) PG_GETARG_POINTER(0);
if (state == NULL)
PG_RETURN_NULL();
qsort(state->d.float8_values, state->nelems, sizeof(float8), orafce_float8_cmp);
lidx = state->nelems / 2 + 1 - 1;
hidx = (state->nelems + 1) / 2 - 1;
if (lidx == hidx)
result = state->d.float8_values[lidx];
else
result = (state->d.float8_values[lidx] + state->d.float8_values[hidx]) / 2.0;
PG_RETURN_FLOAT8(result);
}
相关信息
相关文章
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦