greenplumn pxf_header 源码
greenplumn pxf_header 代码
文件路径:/gpcontrib/pxf_fdw/pxf_header.c
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "pxf_filter.h"
#include "pxf_header.h"
#include "access/external.h"
#include "access/url.h"
#include "utils/builtins.h"
#include "commands/defrem.h"
#include "utils/formatting.h"
#include "utils/timestamp.h"
#include "utils/syscache.h"
/* helper function declarations */
static void AddAlignmentSizeHttpHeader(CHURL_HEADERS headers);
static void AddTupleDescriptionToHttpHeader(CHURL_HEADERS headers, Relation rel);
static void AddOptionsToHttpHeader(CHURL_HEADERS headers, List *options);
static void AddProjectionDescHttpHeader(CHURL_HEADERS headers, List *retrieved_attrs);
static void AddProjectionIndexHeader(CHURL_HEADERS headers, int attno, char *long_number);
static char *NormalizeKeyName(const char *key);
static char *TypeOidGetTypename(Oid typid);
/*
* Add key/value pairs to connection header.
* These values are the context of the query and used
* by the remote component.
*/
void
BuildHttpHeaders(CHURL_HEADERS headers,
PxfOptions *options,
Relation relation,
char *filter_string,
List *retrieved_attrs)
{
extvar_t ev;
char pxfPortString[sizeof(int32) * 8];
if (relation != NULL)
{
/* Record fields - name and type of each field */
AddTupleDescriptionToHttpHeader(headers, relation);
}
if (retrieved_attrs != NULL)
{
/* add the list of attrs to the projection desc http headers */
AddProjectionDescHttpHeader(headers, retrieved_attrs);
}
/* GP cluster configuration */
external_set_env_vars(&ev, "pxf_fdw", false, NULL, NULL, false, 0);
/*
* make sure that user identity is known and set, otherwise impersonation
* by PXF will be impossible
*/
if (!ev.GP_USER || !ev.GP_USER[0])
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("user identity is unknown")));
churl_headers_append(headers, "X-GP-USER", ev.GP_USER);
churl_headers_append(headers, "X-GP-SEGMENT-ID", ev.GP_SEGMENT_ID);
churl_headers_append(headers, "X-GP-SEGMENT-COUNT", ev.GP_SEGMENT_COUNT);
churl_headers_append(headers, "X-GP-XID", ev.GP_XID);
AddAlignmentSizeHttpHeader(headers);
/* Convert the number of attributes to a string */
pg_ltoa(options->pxf_port, pxfPortString);
/* headers for uri data */
churl_headers_append(headers, "X-GP-URL-HOST", options->pxf_host);
churl_headers_append(headers, "X-GP-URL-PORT", pxfPortString);
churl_headers_append(headers, "X-GP-OPTIONS-PROFILE", options->profile);
/* only text format is supported for FDW */
churl_headers_append(headers, "X-GP-FORMAT", "TEXT");
churl_headers_append(headers, "X-GP-DATA-DIR", options->resource);
churl_headers_append(headers, "X-GP-OPTIONS-SERVER", options->server);
/* extra options */
AddOptionsToHttpHeader(headers, options->options);
/* copy options */
AddOptionsToHttpHeader(headers, options->copy_options);
/* filters */
if (filter_string && strcmp(filter_string, "") != 0)
{
churl_headers_append(headers, "X-GP-FILTER", filter_string);
churl_headers_append(headers, "X-GP-HAS-FILTER", "1");
}
else
churl_headers_append(headers, "X-GP-HAS-FILTER", "0");
}
/* Report alignment size to remote component
* GPDBWritable uses alignment that has to be the same as
* in the C code.
* Since the C code can be compiled for both 32 and 64 bits,
* the alignment can be either 4 or 8.
*/
static void
AddAlignmentSizeHttpHeader(CHURL_HEADERS headers)
{
char tmp[sizeof(char *)];
pg_ltoa(sizeof(char *), tmp);
churl_headers_append(headers, "X-GP-ALIGNMENT", tmp);
}
/*
* Report tuple description to remote component
* Currently, number of attributes, attributes names, types and types modifiers
* Each attribute has a pair of key/value
* where X is the number of the attribute
* X-GP-ATTR-NAMEX - attribute X's name
* X-GP-ATTR-TYPECODEX - attribute X's type OID (e.g, 16)
* X-GP-ATTR-TYPENAMEX - attribute X's type name (e.g, "boolean")
* optional - X-GP-ATTR-TYPEMODX-COUNT - total number of modifier for attribute X
* optional - X-GP-ATTR-TYPEMODX-Y - attribute X's modifiers Y (types which have precision info, like numeric(p,s))
*/
static void
AddTupleDescriptionToHttpHeader(CHURL_HEADERS headers, Relation rel)
{
char long_number[sizeof(int32) * 8];
StringInfoData formatter;
TupleDesc tuple;
initStringInfo(&formatter);
/* Get tuple description itself */
tuple = RelationGetDescr(rel);
/* Convert the number of attributes to a string */
pg_ltoa(tuple->natts, long_number);
churl_headers_append(headers, "X-GP-ATTRS", long_number);
/* Iterate attributes */
for (int i = 0; i < tuple->natts; ++i)
{
Form_pg_attribute attr = TupleDescAttr(tuple, i);
/* Add a key/value pair for attribute name */
resetStringInfo(&formatter);
appendStringInfo(&formatter, "X-GP-ATTR-NAME%u", i);
churl_headers_append(headers, formatter.data, attr->attname.data);
/* Add a key/value pair for attribute type */
resetStringInfo(&formatter);
appendStringInfo(&formatter, "X-GP-ATTR-TYPECODE%u", i);
pg_ltoa(attr->atttypid, long_number);
churl_headers_append(headers, formatter.data, long_number);
/* Add a key/value pair for attribute type name */
resetStringInfo(&formatter);
appendStringInfo(&formatter, "X-GP-ATTR-TYPENAME%u", i);
churl_headers_append(headers, formatter.data, TypeOidGetTypename(attr->atttypid));
/* Add attribute type modifiers if any */
if (attr->atttypmod > -1)
{
switch (attr->atttypid)
{
case NUMERICOID:
{
resetStringInfo(&formatter);
appendStringInfo(&formatter, "X-GP-ATTR-TYPEMOD%u-COUNT", i);
pg_ltoa(2, long_number);
churl_headers_append(headers, formatter.data, long_number);
/* precision */
resetStringInfo(&formatter);
appendStringInfo(&formatter, "X-GP-ATTR-TYPEMOD%u-%u", i, 0);
pg_ltoa((attr->atttypmod >> 16) & 0xffff, long_number);
churl_headers_append(headers, formatter.data, long_number);
/* scale */
resetStringInfo(&formatter);
appendStringInfo(&formatter, "X-GP-ATTR-TYPEMOD%u-%u", i, 1);
pg_ltoa((attr->atttypmod - VARHDRSZ) & 0xffff, long_number);
churl_headers_append(headers, formatter.data, long_number);
break;
}
case CHAROID:
case BPCHAROID:
case VARCHAROID:
{
resetStringInfo(&formatter);
appendStringInfo(&formatter, "X-GP-ATTR-TYPEMOD%u-COUNT", i);
pg_ltoa(1, long_number);
churl_headers_append(headers, formatter.data, long_number);
resetStringInfo(&formatter);
appendStringInfo(&formatter, "X-GP-ATTR-TYPEMOD%u-%u", i, 0);
pg_ltoa((attr->atttypmod - VARHDRSZ), long_number);
churl_headers_append(headers, formatter.data, long_number);
break;
}
case VARBITOID:
case BITOID:
case TIMESTAMPOID:
case TIMESTAMPTZOID:
case TIMEOID:
case TIMETZOID:
{
resetStringInfo(&formatter);
appendStringInfo(&formatter, "X-GP-ATTR-TYPEMOD%u-COUNT", i);
pg_ltoa(1, long_number);
churl_headers_append(headers, formatter.data, long_number);
resetStringInfo(&formatter);
appendStringInfo(&formatter, "X-GP-ATTR-TYPEMOD%u-%u", i, 0);
pg_ltoa((attr->atttypmod), long_number);
churl_headers_append(headers, formatter.data, long_number);
break;
}
case INTERVALOID:
{
resetStringInfo(&formatter);
appendStringInfo(&formatter, "X-GP-ATTR-TYPEMOD%u-COUNT", i);
pg_ltoa(1, long_number);
churl_headers_append(headers, formatter.data, long_number);
resetStringInfo(&formatter);
appendStringInfo(&formatter, "X-GP-ATTR-TYPEMOD%u-%u", i, 0);
pg_ltoa(INTERVAL_PRECISION(attr->atttypmod), long_number);
churl_headers_append(headers, formatter.data, long_number);
break;
}
default:
elog(DEBUG5, "addTupleDescriptionToHttpHeader: unsupported type %d ", attr->atttypid);
break;
}
}
}
pfree(formatter.data);
}
/*
* Report projection description to the remote component
*/
static void
AddProjectionDescHttpHeader(CHURL_HEADERS headers, List *retrieved_attrs)
{
ListCell *lc1 = NULL;
char long_number[sizeof(int32) * 8];
foreach(lc1, retrieved_attrs)
{
int attno = lfirst_int(lc1);
/* zero-based index in the server side */
AddProjectionIndexHeader(headers, attno - 1, long_number);
}
if (retrieved_attrs->length == 0)
return;
/* Convert the number of projection columns to a string */
pg_ltoa(retrieved_attrs->length, long_number);
churl_headers_append(headers, "X-GP-ATTRS-PROJ", long_number);
}
/*
* Adds the projection index header for the given attno
*/
static void
AddProjectionIndexHeader(CHURL_HEADERS headers,
int attno,
char *long_number)
{
pg_ltoa(attno, long_number);
churl_headers_append(headers, "X-GP-ATTRS-PROJ-IDX", long_number);
}
/*
* Add all the FDW options in the list to the curl headers
*/
static void
AddOptionsToHttpHeader(CHURL_HEADERS headers, List *options)
{
ListCell *cell;
foreach(cell, options)
{
DefElem *def = (DefElem *) lfirst(cell);
char *x_gp_key = NormalizeKeyName(def->defname);
churl_headers_append(headers, x_gp_key, defGetString(def));
pfree(x_gp_key);
}
}
/*
* Full name of the HEADER KEY expected by the PXF service
* Converts input string to upper case and prepends "X-GP-OPTIONS-" string
* This will be used for all user defined parameters to be isolate from internal parameters
*/
static char *
NormalizeKeyName(const char *key)
{
if (!key || strlen(key) == 0)
elog(ERROR, "internal error in pxfutils.c:normalize_key_name, parameter key is null or empty");
return psprintf("X-GP-OPTIONS-%s", asc_toupper(pstrdup(key), strlen(key)));
}
/*
* TypeOidGetTypename
* Get the name of the type, given the OID
*/
static char *
TypeOidGetTypename(Oid typid)
{
Assert(OidIsValid(typid));
HeapTuple typtup = SearchSysCache(TYPEOID,
ObjectIdGetDatum(typid),
0, 0, 0);
if (!HeapTupleIsValid(typtup))
elog(ERROR, "cache lookup failed for type %u", typid);
Form_pg_type typform = (Form_pg_type) GETSTRUCT(typtup);
char *typname = psprintf("%s", NameStr(typform->typname));
ReleaseSysCache(typtup);
return typname;
}
相关信息
相关文章
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦