greenplumn mapred 源码

  • 2022-08-18
  • 浏览 (384)

greenplumn mapred 代码

文件路径:/gpcontrib/gpmapreduce/src/mapred.c

#include <mapred.h>
#include <except.h>
#include <mapred_errors.h>

#include <stdio.h>
#include <stdarg.h>
#include <unistd.h>     /* for file "access" test */
#include <errno.h>

#define scalarfree(x)							\
	do {										\
		if (x) {								\
			free(x);							\
			x = NULL;							\
		}										\
	} while (0)


/* instantiate the extern from mapred.h */
const char *mapred_kind_name[MAPRED_MAXKIND+1] =
{
	"<NULL>",
	"DOCUMENT",
	"INPUT",
	"OUTPUT",
	"MAP",
	"TRANSITION",
	"CONSOLIDATE",
	"FINALIZE",
	"REDUCE",
	"TASK",
	"RUN",
	"INTERNAL"
};

/* instantiate default parameter names from mapred.h */
const char *default_parameter_names[MAPRED_MAXKIND+1][2] =
{
	{NULL, NULL},          /* MAPRED_NO_KIND */
	{NULL, NULL},          /* MAPRED_DOCUMENT */
	{NULL, NULL},          /* MAPRED_INPUT */
	{NULL, NULL},          /* MAPRED_OUTPUT */
	{"key", "value"},      /* MAPRED_MAPPER */
	{"state", "value"},    /* MAPRED_TRANSITION */
	{"state1", "state2"},  /* MAPRED_COMBINER */
	{"state", NULL},       /* MAPRED_FINALIZER */
	{NULL, NULL},          /* MAPRED_REDUCER */
	{NULL, NULL},          /* MAPRED_TASK */
	{NULL, NULL},          /* MAPRED_EXECUTION */
	{NULL, NULL}           /* MAPRED_ADT */
};

/* instantiate default parameter names from mapred.h */
const char *default_return_names[MAPRED_MAXKIND+1][2] =
{
	{NULL, NULL},          /* MAPRED_NO_KIND */
	{NULL, NULL},          /* MAPRED_DOCUMENT */
	{NULL, NULL},          /* MAPRED_INPUT */
	{NULL, NULL},          /* MAPRED_OUTPUT */
	{"key", "value"},      /* MAPRED_MAPPER */
	{"value", NULL},       /* MAPRED_TRANSITION */
	{"value", NULL},       /* MAPRED_COMBINER */
	{"value", NULL},       /* MAPRED_FINALIZER */
	{NULL, NULL},          /* MAPRED_REDUCER */
	{NULL, NULL},          /* MAPRED_TASK */
	{NULL, NULL},          /* MAPRED_EXECUTION */
	{NULL, NULL}           /* MAPRED_ADT */
};


/*
 * libpq Errors that we care about
 * (would be better to add <errcodes.h> to the include path)
 */
const char *IN_FAILED_SQL_TRANSACTION = "25P02";
const char *OBJ_DOES_NOT_EXIST        = "42P01";
const char *SCHEMA_DOES_NOT_EXIST     = "3F000";

const char *DISTRIBUTION_NOTICE = "NOTICE:  Table doesn't have 'DISTRIBUTED BY' clause";

/* local prototypes */
void *         mapred_malloc(int size);
void             mapred_free(void *ptr);

buffer_t *        makebuffer(size_t bufsize, size_t grow);
void                bufreset(buffer_t *b);
void                  bufcat(buffer_t **bufp, char* fmt);
void   ignore_notice_handler(void *arg, const PGresult *res);
void    print_notice_handler(void *arg, const PGresult *res);
void    mapred_setup_columns(PGconn *conn, mapred_object_t *obj);

boolean mapred_create_object(PGconn *conn, mapred_document_t *doc,
							 mapred_object_t *obj);
void    mapred_remove_object(PGconn *conn, mapred_document_t *doc,
							 mapred_object_t *obj);
void      mapred_run_queries(PGconn *conn, mapred_document_t *doc);

void mapred_resolve_dependencies(PGconn *conn, mapred_document_t *doc);
void mapred_resolve_ref(mapred_olist_t *olist, mapred_reference_t *ref);
void mapred_resolve_object(PGconn *conn, mapred_document_t *doc,
						   mapred_object_t *obj, int *exec_count);

void lookup_function_in_catalog(PGconn *conn, mapred_document_t *doc,
								mapred_object_t *obj);
int mapred_obj_error(mapred_object_t *obj, char *fmt, ...)
	__attribute__((format(PG_PRINTF_ATTRIBUTE, 2, 3)));

/* Wrappers around malloc/free to handle error conditions more cleanly */
void *mapred_malloc(int size)
{
	void *m;
	XASSERT(size > 0);

	if (global_debug_flag && global_verbose_flag)
		fprintf(stderr, "Allocating %d bytes: ", size);

	m = malloc(size);
	if (!m)
		XRAISE(MEMORY_ERROR, "Memory allocation failure");

	if (global_debug_flag && global_verbose_flag)
		fprintf(stderr, "%p\n", m);

	return m;
}

#define copyscalar(s)							\
	strcpy(mapred_malloc(strlen(s)+1), s)

void mapred_free(void *ptr)
{
	XASSERT(ptr);

	if (global_debug_flag && global_verbose_flag)
		fprintf(stderr, "Freeing memory: %p\n", ptr);

	free(ptr);
}

int mapred_obj_error(mapred_object_t *obj, char *fmt, ...)
{
	va_list arg;

	if (global_verbose_flag)
		fprintf(stderr, "    - ");

	fprintf(stderr, "Error: ");
	if (obj && obj->name)
		fprintf(stderr, "%s '%s': ", mapred_kind_name[obj->kind], obj->name);
	if (obj && !obj->name)
		fprintf(stderr, "%s: ", mapred_kind_name[obj->kind]);

	va_start(arg, fmt);
	vfprintf(stderr, fmt, arg);
	va_end(arg);
	if (obj->line > 0)
		fprintf(stderr, ", at line %d\n", obj->line);
	else
		fprintf(stderr, "\n");

	return MAPRED_PARSE_ERROR;
}

static void mapred_obj_debug(mapred_object_t *obj)
{
	mapred_plist_t *plist;

	if (!obj)
	{
		fprintf(stderr, "Object is NULL");
		return;
	}
	fprintf(stderr, "%s: \n", mapred_kind_name[obj->kind]);
	fprintf(stderr, "  NAME: '%s': \n", obj->name ? obj->name : "-");
	switch (obj->kind)
	{
		case MAPRED_NO_KIND:
		case MAPRED_DOCUMENT:
		case MAPRED_ADT:
		case MAPRED_INPUT:
		case MAPRED_OUTPUT:
		case MAPRED_TASK:
		case MAPRED_EXECUTION:
		case MAPRED_REDUCER:
		{
			fprintf(stderr, "  DEBUG: 'debug output not yet implemented'\n");
			break;
		}

		case MAPRED_MAPPER:
		case MAPRED_TRANSITION:
		case MAPRED_COMBINER:
		case MAPRED_FINALIZER:
		{
			fprintf(stderr, "  LANGUAGE: %s\n", obj->u.function.language ?
					obj->u.function.language : "-");
			fprintf(stderr, "  PARAMETERS: [");
			for (plist = obj->u.function.parameters; plist; plist = plist->next)
			{
				fprintf(stderr, "%s %s%s", plist->name, plist->type,
						plist->next ? ", " : "");
			}
			fprintf(stderr,"]\n");
			fprintf(stderr, "  RETURNS: [");
			for (plist = obj->u.function.returns; plist; plist = plist->next)
			{
				fprintf(stderr, "%s %s%s", plist->name, plist->type,
						plist->next ? ", " : "");
			}
			fprintf(stderr,"]\n");
			fprintf(stderr, "  LIBRARY: %s\n", obj->u.function.library ?
					obj->u.function.library : "-");
			fprintf(stderr, "  FUNCTION: %s\n", obj->u.function.body ?
					obj->u.function.body : "-");
			break;
		}
	}
}


/* -------------------------------------------------------------------------- */
/* Functions that play with buffers                                           */
/* -------------------------------------------------------------------------- */
buffer_t *makebuffer(size_t bufsize, size_t grow)
{
	buffer_t *b;

	XASSERT(bufsize > 0 && grow > 0);

	b = mapred_malloc(sizeof(buffer_t) + bufsize);
	b->buffer   = (char*)(b+1);
	b->bufsize  = bufsize;
	b->grow     = grow;
	b->position = 0;
	b->buffer[0] = '\0';
	return b;
}

/* to re-use a buffer just "reset" it */
void bufreset(buffer_t *b)
{
	XASSERT(b && b->bufsize > 0 && b->grow > 0);
	b->position = 0;
	b->buffer[0] = '\0';
}

/*
 * A simple wrapper around a strncpy that handles resizing an input buffer
 * when needed.
 */
void bufcat(buffer_t **bufp, char* str)
{
	buffer_t  *b;
	size_t     len;

	XASSERT(bufp && *bufp);
	XASSERT(str);

	b = *bufp;
	len = strlen(str);

	/* If the buffer is too small, grow it */
	if (b->bufsize <= b->position + len)
	{
		buffer_t *newbuf;

		/* use the minumum of "grow" and the new length for the grow amount */
		if (b->grow <= len)
			b->grow = len+1;

		newbuf = makebuffer(b->bufsize + b->grow, b->grow);
		memcpy(newbuf->buffer, b->buffer, b->position+1);
		newbuf->position = b->position;
		*bufp = newbuf;
		mapred_free(b);
		b = newbuf;
	}

	/* We are now guaranteed that we have enough space in the buffer */
	XASSERT( b->bufsize - b->position > len );
	strcpy(b->buffer+b->position, str);
	b->position += len;
	b->buffer[b->position] = '\0';
}


/*
 * Currently we just ignore all warnings, may eventually do something
 * smarter, but this is preferable to dumping them to libpq's default
 * of dumping them to stderr.
 */
void ignore_notice_handler(void *arg, const PGresult *res)
{
}

void print_notice_handler(void *arg, const PGresult *res)
{
	char *error = PQresultErrorMessage(res);
	if (!strncmp(error, DISTRIBUTION_NOTICE, strlen(DISTRIBUTION_NOTICE)-1))
		return;

	if (global_verbose_flag)
		fprintf(stderr, "   - ");
	fprintf(stderr, "%s", error);
}

/*
 * If a function is already defined in the database we need to be able to
 * lookup the function information directly from the catalog.  This is
 * fairly similar to func_get_detail in backend/parser/parse_func.c, but
 * the lookup from yaml is slightly different because we don't know the
 * context that the function is in, but we _might_ have been told some
 * of the parameter information already.
 */
void lookup_function_in_catalog(PGconn *conn, mapred_document_t *doc,
								mapred_object_t *obj)
{
	PGresult			*result	 = NULL;
	PGresult			*result2 = NULL;
	mapred_plist_t		*plist, *plist2;
	mapred_plist_t		*newitem = NULL;
	mapred_plist_t		*returns = NULL;
	buffer_t			*buffer	 = NULL;
	char				*tmp1	 = NULL;
	char				*tmp2	 = NULL;
	char				*tmp3	 = NULL;
#define STR_LEN 50
	char				str[STR_LEN];
	int					i, nargs;

	XASSERT(doc);
	XASSERT(obj);
	XASSERT(obj->kind == MAPRED_MAPPER     ||
			obj->kind == MAPRED_TRANSITION ||
			obj->kind == MAPRED_COMBINER   ||
			obj->kind == MAPRED_FINALIZER);

	obj->internal = true;
	obj->u.function.internal_returns = NULL;

	XTRY
	{
		buffer = makebuffer(1024, 1024);

		/* Try to lookup the specified function */
		bufcat(&buffer,
			   "SELECT proretset, prorettype::regtype, pronargs,\n"
			   "       proargnames, proargmodes, \n"
			   "       (proargtypes::regtype[])[0:pronargs] as proargtypes,\n"
			   "       proallargtypes::regtype[],\n");

		/*
		 * If we have return types defined in the yaml then we want to resolve
		 * them to their authorative names for comparison purposes.
		 */
		if (obj->u.function.returns)
		{
			bufcat(&buffer, "       ARRAY[");
			for (plist = obj->u.function.returns; plist; plist = plist->next)
			{
				if (plist->type)
				{
					bufcat(&buffer, "'");
					bufcat(&buffer, plist->type);
					bufcat(&buffer, "'::regtype");
				}
				else
				{
					/* If we don't know the type, punt */
					bufcat(&buffer, "'-'::regtype");
				}
				if (plist->next)
					bufcat(&buffer, ", ");
			}
			bufcat(&buffer, "] as yaml_rettypes\n");
		}
		else
		{
			bufcat(&buffer, "       null::regtype[] as yaml_rettypes\n");
		}

		bufcat(&buffer,
			   "FROM   pg_proc\n"
			   "WHERE  prokind = 'f'\n"
			   "  AND  proname = lower('");
		bufcat(&buffer, obj->name);
		bufcat(&buffer, "')\n");

		/* Fill in the known parameter types */
		nargs = 0;
		if (obj->u.function.parameters)
		{
			bufcat(&buffer, "  AND  (proargtypes::regtype[])[0:pronargs] = ARRAY[");
			for (plist = obj->u.function.parameters; plist; plist = plist->next)
			{
				nargs++;
				bufcat(&buffer, "'");
				bufcat(&buffer, plist->type);
				bufcat(&buffer, "'::regtype");
				if (plist->next)
					bufcat(&buffer, ", ");
			}
			snprintf(str, STR_LEN, "]\n  AND pronargs=%d\n", nargs);
			bufcat(&buffer, str);
		}

		/* Run the SQL */
		if (global_print_flag || global_debug_flag)
			printf("%s", buffer->buffer);
		result = PQexec(conn, buffer->buffer);
		bufreset(buffer);

		if (PQresultStatus(result) != PGRES_TUPLES_OK)
		{
			/*
			 * The SQL statement failed:
			 * Most likely scenario is a bad datatype causing the regtype cast
			 * to fail.
			 */
			char *code  = PQresultErrorField(result, PG_DIAG_SQLSTATE);
			char *error = PQresultErrorMessage(result);

			printf("errcode=\"%s\"\n", code);  /* Todo: validate expected error code */

			mapred_obj_error(obj, "SQL Error resolving function: \n  %s",
							 error);
			XRAISE(MAPRED_PARSE_ERROR, "Object creation Failure");
		}
		else if (PQntuples(result) == 0)
		{
			/* No such function */
			mapred_obj_error(obj, "No such function");
			XRAISE(MAPRED_PARSE_ERROR, "Object creation Failure");
		}
		else if (PQntuples(result) > 1)
		{
			XASSERT(!obj->u.function.parameters);
			mapred_obj_error(obj, "Ambiguous function, supply a function "
							 "prototype for disambiguation");
			XRAISE(MAPRED_PARSE_ERROR, "Object creation Failure");
		}
		else
		{
			char		*value;
			int          len;
			boolean		 retset;
			int          nargs;
			char		*argtypes	 = NULL;
			char		*argnames	 = "";
			char		*argmodes	 = "";
			char		*allargtypes = "";
			char		*rettype	 = NULL;
			char        *yaml_rettypes = "";
			char        *type, *typetokens = NULL;
			char        *name, *nametokens = NULL;
			char        *mode, *modetokens = NULL;
			boolean     name_end, mode_end;

			value = PQgetvalue(result, 0, 0);   /* Column 0: proretset */
			retset = (value[0] == 't');
			value = PQgetvalue(result, 0, 1);   /* Column 1: prorettype */
			rettype = value;
			value = PQgetvalue(result, 0, 2);   /* Column 2: pronargs */
			nargs = (int) strtol(value, (char **) NULL, 10);

			/*
			 * Arrays are formatted as:  "{value,value,...}"
			 * of which we only want "value,value, ..."
			 * so find the part of the string between the braces
			 */
			if (!PQgetisnull(result, 0, 3))   /* Column 3: proargnames */
			{
				value = PQgetvalue(result, 0, 3);
				argnames = value+1;
				len = strlen(argnames);
				if (len > 0)
					argnames[len-1] = '\0';
			}

			if (!PQgetisnull(result, 0, 4))   /* Column 4: proargmodes */
			{
				value = PQgetvalue(result, 0, 4);
				argmodes = value+1;
				len = strlen(argmodes);
				if (len > 0)
					argmodes[len-1] = '\0';
			}
			if (!PQgetisnull(result, 0, 5))   /* Column 5: proargtypes */
			{
				value = PQgetvalue(result, 0, 5);
				argtypes = value+1;
				len = strlen(argtypes);
				if (len > 0)
					argtypes[len-1] = '\0';
			}
			if (!PQgetisnull(result, 0, 6))   /* Column 6: proallargtypes */
			{
				value = PQgetvalue(result, 0, 6);
				allargtypes = value+1;
				len = strlen(allargtypes);
				if (len > 0)
					allargtypes[len-1] = '\0';
			}
			if (!PQgetisnull(result, 0, 7))   /* Column 7: yaml_rettypes */
			{
				value = PQgetvalue(result, 0, 7);
				yaml_rettypes = value+1;
				len = strlen(yaml_rettypes);
				if (len > 0)
					yaml_rettypes[len-1] = '\0';
			}

			/*
			 * These constraints should all be enforced in the catalog, so
			 * if something is wrong then it's a coding error above.
			 */
			XASSERT(rettype);
			XASSERT(argtypes);
			XASSERT(nargs >= 0);

			/*
			 * If we just derived the parameters from the catalog then we
			 * need complete our internal metadata.
			 */
			plist = NULL;
			if (!obj->u.function.parameters)
			{
				/* strtok is destructive and we need to preserve the original
				 * string, so we make some annoying copies prior to strtok.
				 */
				tmp1 = copyscalar(argtypes);
				tmp2 = copyscalar(argnames);
				tmp3 = copyscalar(argmodes);

				type = strtok_r(tmp1, ",", &typetokens);
				name = strtok_r(tmp2, ",", &nametokens);
				mode = strtok_r(tmp3, ",", &modetokens);

				/*
				 * Name and mode are used for IN/OUT parameters and may not be
				 * present.  In the event that they are we are looking for:
				 *   - the "i" (in) arguments
				 *   - the "b" (inout) arguments
				 * we skip over:
				 *   - the "o" (out) arguments.
				 *   - the "t" (table out) arguments.
				 *
				 * Further it is possible for some of the arguments to be named
				 * and others to be unnamed.  The unnamed arguments will show
				 * up as "" (two quotes, not an empty string) if there is an
				 * argnames defined.
				 *
				 * If argmodes is not defined then all names in proargnames
				 * refer to input arguments.
				 */

				while (mode && strcmp(mode, "i") && strcmp(mode, "b"))
				{
					name = strtok_r(NULL, ",", &nametokens);
					mode = strtok_r(NULL, ",", &modetokens);
				}
				name_end = (NULL == name);
				mode_end = (NULL == mode);

				i = 0;
				while (type)
				{
					/* Keep track of which parameter we are on */
					i++;
					XASSERT(i <= nargs);

					/*
					 * If a name was not specified by the user, and was not
					 * specified by the in/out parameters then we assign it a
					 * default name.
					 */
					if (!name)
					{
						/* single argument functions always default to "value" */
						if (i == 1 && nargs == 1)
							name = (char*) "value";

						/* Base name on default parameter names for the first
						 * two arguments */
						else if (i <= 2)
							name = (char*) default_parameter_names[obj->kind][i-1];

						/*
						 * If we still didn't decide on a name, make up
						 * something useless.
						 */
						if (!name)
						{
							snprintf(str, STR_LEN, "parameter%d", i);
							name = str;
						}
					}

					if (!plist)
					{
						plist = mapred_malloc(sizeof(mapred_plist_t));
						plist->name = copyscalar(name);
						plist->type = copyscalar(type);
						plist->next = (mapred_plist_t *) NULL;
						obj->u.function.parameters = plist;
					}
					else
					{
						plist->next = mapred_malloc(sizeof(mapred_plist_t));
						plist = plist->next;
						plist->name = copyscalar(name);
						plist->type = copyscalar(type);
						plist->next = (mapred_plist_t *) NULL;
					}

					/* Procede to the next parameter */
					type = strtok_r(NULL, ",", &typetokens);
					if (!name_end)
					{
						name = strtok_r(NULL, ",", &nametokens);
						name_end = (NULL == name);
					}
					if (!mode_end)
					{
						mode = strtok_r(NULL, ",", &modetokens);
						mode_end = (NULL == mode);
					}
					while (mode && strcmp(mode, "i") && strcmp(mode, "b"))
					{
						if (!name_end)
						{
							name = strtok_r(NULL, ",", &nametokens);
							name_end = (NULL == name);
						}
						if (!mode_end)
						{
							mode = strtok_r(NULL, ",", &modetokens);
							mode_end = (NULL == mode);
						}
					}
				}

				mapred_free(tmp1);
				mapred_free(tmp2);
				mapred_free(tmp3);
				tmp1 = NULL;
				tmp2 = NULL;
				tmp3 = NULL;
			}

			/*
			 * Check that the number of parameters received is appropriate.
			 * This would be better moved to a generalized validation routine.
			 */
			switch (obj->kind)
			{

				case MAPRED_MAPPER:
					/*
					 * It would probably be possible to start supporting zero
					 * argument mappers, but:
					 *   1) It would require more modifications
					 *   2) Doesn't currently have a known use case
					 *   3) Has easy workarounds
					 */
					if (nargs < 1)
					{
						mapred_obj_error(obj, "Transition functions require "
										 "two or more parameters");
						XRAISE(MAPRED_PARSE_ERROR, "Object creation Failure");
					}
					break;

				case MAPRED_TRANSITION:
					if (nargs < 2)
					{
						mapred_obj_error(obj, "Transition functions require "
										 "two or more parameters");
						XRAISE(MAPRED_PARSE_ERROR, "Object creation Failure");
					}
					if (retset)
					{
						mapred_obj_error(obj, "Transition functions cannot "
										 "be table functions");
						XRAISE(MAPRED_PARSE_ERROR, "Object creation Failure");
					}
					break;

				case MAPRED_COMBINER:
					if (nargs != 2)
					{
						mapred_obj_error(obj, "Consolidate functions require "
										 "exactly two parameters");
						XRAISE(MAPRED_PARSE_ERROR, "Object creation Failure");
					}
					if (retset)
					{
						mapred_obj_error(obj, "Consolidate functions cannot "
										 "be table functions");
						XRAISE(MAPRED_PARSE_ERROR, "Object creation Failure");
					}
					break;

				case MAPRED_FINALIZER:
					if (nargs != 1)
					{
						mapred_obj_error(obj, "Finalize functions require "
										 "exactly one parameter");
						XRAISE(MAPRED_PARSE_ERROR, "Object creation Failure");
					}
					break;

				default:
					XASSERT(false);
			}

			/* Fill in return type information */
			if (retset)
				obj->u.function.mode = MAPRED_MODE_MULTI;
			else
				obj->u.function.mode = MAPRED_MODE_SINGLE;

			/*
			 * Determine the return type information, there are 3 primary
			 * subcases:
			 *
			 *  1) Function is defined with OUT/TABLE parameters.
			 *  2) Function returns a simple type.
			 *  3) Function returns a complex type.
			 *  4) Return type is void [error]
			 */
			plist = returns = NULL;
			if (argmodes && strlen(argmodes) > 0)
			{

				/* strtok is destructive and we need to preserve the original
				 * string, so we make some annoying copies prior to strtok.
				 */
				tmp1 = copyscalar(allargtypes);
				tmp2 = copyscalar(argnames);
				tmp3 = copyscalar(argmodes);

				type = strtok_r(tmp1, ",", &typetokens);
				name = strtok_r(tmp2, ",", &nametokens);
				mode = strtok_r(tmp3, ",", &modetokens);

				i = 1;
				while (mode)
				{
					while (mode &&
						   strcmp(mode, "o") &&
						   strcmp(mode, "b") &&
						   strcmp(mode, "t"))
					{
						/* skip input parameters */
						type = strtok_r(NULL, ",", &typetokens);
						name = strtok_r(NULL, ",", &nametokens);
						mode = strtok_r(NULL, ",", &modetokens);
					}
					if (mode)
					{
						XASSERT(type);

						newitem = mapred_malloc(sizeof(mapred_plist_t));

						/*
						 * Note we haven't made local copies of these, we will
						 * do this after resolution when validating against any
						 * RETURNS defined in the yaml, if any.
						 */

						if( NULL != name &&
								0 != strcmp(name, "") &&
								0 != strcmp(name, "\"\"") )
						{
							/*if name defined in db, just use it*/
							newitem->name = copyscalar(name);
						}
						else
						{
							/*else just obey the default name in db*/
							snprintf( str, STR_LEN, "column%d", i);
							newitem->name = copyscalar(str);
						}

						newitem->type = copyscalar(type);

						newitem->next = NULL;

						if (plist)
							plist->next = newitem;
						else
							returns = newitem;
						plist = newitem;
						++i;
					}
					type = strtok_r(NULL, ",", &typetokens);
					name = strtok_r(NULL, ",", &nametokens);
					mode = strtok_r(NULL, ",", &modetokens);
				}

				mapred_free(tmp1);
				mapred_free(tmp2);
				mapred_free(tmp3);
				tmp1 = NULL;
				tmp2 = NULL;
				tmp3 = NULL;
			}

			/*
			 * If the arguments were not defined in the function definition then
			 * we check to see if this was a complex type by looking up the type
			 * information in pg_attribute.
			 */
			if (!returns)
			{
				bufcat(&buffer,
					   "SELECT attname, atttypid::regtype\n"
					   "FROM   pg_attribute a\n"
					   "JOIN   pg_class c on (a.attrelid = c.oid)\n"
					   "WHERE  not a.attisdropped\n"
					   "  AND  a.attnum > 0\n"
					   "  AND  c.reltype = '");
				bufcat(&buffer, rettype);
				bufcat(&buffer,
					   "'::regtype\n"
					   "ORDER BY -attnum");
				result2 = PQexec(conn, buffer->buffer);
				bufreset(buffer);

				if (PQresultStatus(result2) != PGRES_TUPLES_OK)
				{
					char *error = PQresultErrorMessage(result);

					mapred_obj_error(obj, "Error resolving function: %s", error);
					XRAISE(MAPRED_PARSE_ERROR, "Object creation Failure");
				}
				else if (PQntuples(result2) > 0)
				{
					/* We have a complex type, build the return list */
					for (i = 0; i < PQntuples(result2); i++)
					{
						name = PQgetvalue(result2, i, 0);
						type = PQgetvalue(result2, i, 1);

						newitem = mapred_malloc(sizeof(mapred_plist_t));
						newitem->name = copyscalar(name);
						newitem->type = copyscalar(type);
						newitem->next = returns;
						returns = newitem;
					}
				}
			}

			/*
			 * If the return types were not defined in either the argument list
			 * nor the catalog then we assume it is a simple type.
			 */
			if (!returns)
			{
				/* Check against "void" which is a special return type that
				 * means there is no return value - which we don't support for
				 * mapreduce.
				 */
				if (!strcmp(rettype, "void"))
				{
					mapred_obj_error(obj, "Function returns void");
					XRAISE(MAPRED_PARSE_ERROR, "Object creation Failure");
				}
				returns = mapred_malloc(sizeof(mapred_plist_t));
				returns->type = copyscalar(rettype);
				returns->name = NULL;
				returns->next = NULL;
			}

			/*
			 * We now should have a returns list, compare it against the RETURNS
			 * list given in the yaml.  The yaml overrides return names, but can
			 * not override return types.  If the return types are incompatible
			 * raise an error.
			 */
			obj->u.function.internal_returns = returns;
			if (obj->u.function.returns)
			{
				/*
				 * The first thing to do is normalize the given return types
				 * with their formal names.  This will, for example turn a type
				 * like "float8" => "double precision".  The input name might
				 * be correct (float8) but we need it represented as the formal
				 * name so that we can compare against the formal name we got
				 * when we looked up the function in the catalog.
				 */
				plist = obj->u.function.returns;
				type = strtok_r(yaml_rettypes, ",", &typetokens);
				while (plist)
				{
					XASSERT(type);  /* should be an equal number */

					/*
					 * If we have a type specified replace it with the one we
					 * resolved from the select stmt, otherwise just keep it
					 * as NULL and fill it in during the compare against what
					 * was in the catalog.
					 */
					if (plist->type)
					{
						mapred_free(plist->type);

						/*
						 * When in an array the typname may get wrapped in
						 * double quotes, if so we need to strip them back out.
						 */
						if (type[0] == '"')
						{
							plist->type = copyscalar(type+1);
							plist->type[strlen(plist->type)-1] = '\0';
						}
						else
						{
							plist->type = copyscalar(type);
						}
					}

					plist = plist->next;
					type = strtok_r(NULL, ",", &typetokens);
				}


				/* Compare against actual function return types */
				plist = obj->u.function.returns;
				plist2 = returns;
				while (plist && plist2)
				{
					XASSERT(plist->name);   /* always defined in YAML */
					XASSERT(plist2->type);  /* always defined in SQL */

					/*
					 * In the YAML it is possible to have a name without a type,
					 * if that is the case then simply take the SQL type.
					 */
					if (!plist->type)
						plist->type = copyscalar(plist2->type);
					else if (strcmp(plist->type, plist2->type))
						break;
					plist  = plist->next;
					plist2 = plist2->next;
				}
				if (plist || plist2)
				{
					mapred_obj_error(obj, "RETURN parameter '%s %s' != '%s %s'",
									 plist ? plist->name : "\"\"",
									 plist ? plist->type : "-",
									 plist2 ? (plist2->name ? plist2->name : plist->name) : "\"\"",
									 plist2 ? plist2->type : "-");
					XRAISE(MAPRED_PARSE_ERROR, "Object creation Failure");
				}
			}
			else
			{
				obj->u.function.returns = returns;
				i = 0;
				for (plist = returns; plist; plist = plist->next)
				{
					XASSERT(plist->type);
					i++;
					plist->type = copyscalar(plist->type);

					/*
					 * if plist->name is not null and empty string,
					 * then use that name
					 */
					if (plist->name &&
							0 != strcmp(plist->name, "") &&
							0 != strcmp(plist->name, "\"\"") )
					{
						plist->name = copyscalar(plist->name);
					}
					/*
					 * else We need generate a name anyway
					 */
					else
					{
						/*
						 * Manufacture a name for a column based on default
						 * naming rules.
						 */
						name = (char*) NULL;
						if (i <= 2)
							name = (char*) default_return_names[obj->kind][i-1];
						if (!name)
						{
							snprintf(str, STR_LEN, "parameter%d", i);
							name = str;
						}
						plist->name = copyscalar(name);
					}
				}
			}
		}
	}
	XFINALLY
	{
		if (result)
			PQclear(result);
		if (result2)
			PQclear(result2);
		if (buffer)
			mapred_free(buffer);

		if (tmp1)
			mapred_free(tmp1);
		if (tmp2)
			mapred_free(tmp2);
		if (tmp3)
			mapred_free(tmp3);

	}
	XTRY_END;
}

void mapred_run_document(PGconn *conn, mapred_document_t *doc)
{
	PGresult       *result;
	mapred_olist_t *olist;
	boolean         done;
	boolean         executes;

	/* Ignore NOTICE messages from database */
	PQsetNoticeReceiver(conn, ignore_notice_handler, NULL);

	/* Establish a name-prefix for temporary objects */
	doc->prefix = mapred_malloc(64);
	snprintf(doc->prefix, 64, "mapreduce_%d_", PQbackendPID(conn));


	/*
	 * Resolution of dependecies was defered until now so that
	 * a database connection could be available to look up any
	 * dependencies that are not defined within the YAML document.
	 */
	if (global_verbose_flag)
		fprintf(stderr, "  - Resolving Dependencies:\n");
	mapred_resolve_dependencies(conn, doc);
	if (global_verbose_flag)
		fprintf(stderr, "    - DONE\n");

	XTRY
	{
		/*
		 * By running things within a transaction we can effectively
		 * obscure the mapreduce sql definitions.  They could still
		 * be exposed by a savy user via mapreduce views that access
		 * the catalog tables, but it's a cleaner method of handling
		 * things.
		 */
		result = PQexec(conn, "BEGIN TRANSACTION");
		if (PQresultStatus(result) != PGRES_COMMAND_OK)
		{
			PQclear(result);
			XRAISE(MAPRED_SQL_ERROR, NULL);
		}
		PQclear(result);

		/* With dependencies resolved we can now try creating objects */
		if (global_verbose_flag)
			fprintf(stderr, "  - Creating Objects\n");

		/*
		 * we don't try to create any executes until all non-executes are
		 * successfully created
		 */
		executes = false;
		do
		{
			boolean progress = false;

			/*
			 * We keep a buffer of errors during parsing, and display them at
			 * the end if they haven't been resolved.
			 */
			if (!doc->errors)
				doc->errors = makebuffer(1024, 1024);
			else
				bufreset(doc->errors);

			/*
			 * Loop through the objects, creating each in turn.
			 * If an object has dependencies that have not been created yet
			 * it will return false and we will make additional passes through
			 * the object list
			 */
			done  = true;
			for (olist = doc->objects; olist; olist = olist->next)
			{
				mapred_object_t *obj = olist->object;
				if (!obj->created &&
					(executes || obj->kind != MAPRED_EXECUTION))
				{
					if (global_verbose_flag && obj->kind != MAPRED_ADT)
					{
						fprintf(stderr, "    - %s:\n",
								mapred_kind_name[obj->kind]);
						fprintf(stderr, "       NAME: %s\n", obj->name);
					}

					if (!mapred_create_object(conn, doc, obj))
						done = false;
					else
						progress = true;
				}
			}

			/*
			 * If all non-execute objects have been created then switch over
			 * and start creating the execution jobs
			 */
			if (done && !executes)
			{
				executes = true;
				done = false;
			}

			/*
			 * If we looped through the list, we are not done, and no progress
			 * was made then we have an infinite cycle and should probably stop.
			 */
			if (!done && !progress)
			{
				if (doc->errors && doc->errors->position > 0)
					fprintf(stderr, "%s", doc->errors->buffer);
				XRAISE(MAPRED_PARSE_ERROR,
					   "Unable to make forward progress creating objects\n");
			}

		} while (!done);

		/* objects created, execute queries */
		mapred_run_queries(conn, doc);
	}
	XCATCH(MAPRED_SQL_ERROR)
	{
		if (global_verbose_flag)
			fprintf(stderr, "    - ");
		fprintf(stderr, "%s", PQerrorMessage(conn));
		XRERAISE();
	}
	XFINALLY
	{
		/* Remove all the objects that we created */
		if (global_print_flag || global_debug_flag)
			printf("\n");
		for (olist = doc->objects; olist; olist = olist->next)
			mapred_remove_object(conn, doc, olist->object);
		if (global_print_flag || global_debug_flag)
			printf("\n");

		/*
		 * We always commit the transaction, even on failure since the failure
		 * may have occured after we generated some output tables and we want
		 * to keep the partial results.
		 */
		result = PQexec(conn, "COMMIT TRANSACTION");
		if (PQresultStatus(result) != PGRES_COMMAND_OK)
		{
			if (global_verbose_flag)
				fprintf(stderr, "    - ");
			fprintf(stderr, "%s", PQerrorMessage(conn));
		}
	}
	XTRY_END;
}


void mapred_resolve_dependencies(PGconn *conn, mapred_document_t *doc)
{
	mapred_olist_t     *olist;
	int                 exec_count = 0;

	/* Walk the list of objects */
	for (olist = doc->objects; olist; olist = olist->next)
		mapred_resolve_object(conn, doc, olist->object, &exec_count);

}

void mapred_resolve_object(PGconn *conn, mapred_document_t *doc,
						   mapred_object_t *obj, int *exec_count)
{
	mapred_olist_t  *newlist;
	mapred_object_t *sub;   /* sub-object */
	size_t           len;

	switch (obj->kind)
	{
		/* Objects with no dependencies */
		case MAPRED_OUTPUT:
		case MAPRED_ADT:
			break;

		case MAPRED_INPUT:
			/*
			 * For FILE/GPFDIST/EXEC inputs we will create a name-prefixed
			 * version of the object to prevent name collisions, and then
			 * create a second temporary view over the external table to
			 * support access to the input by "name".  This involves creating
			 * a second copy of the input which we place directly after the
			 * original input in the document object list.
			 */
			if (obj->u.input.type == MAPRED_INPUT_GPFDIST ||
				obj->u.input.type == MAPRED_INPUT_FILE    ||
				obj->u.input.type == MAPRED_INPUT_EXEC)
			{
				mapred_object_t *newinput;
				mapred_olist_t  *parent;

				newinput = mapred_malloc(sizeof(mapred_object_t));
				memset(newinput, 0, sizeof(mapred_object_t));
				newinput->kind = MAPRED_INPUT;
				len = strlen(obj->name) + 1;
				newinput->name = mapred_malloc(len);
				sprintf(newinput->name, "%s", obj->name);
				newinput->u.input.type = MAPRED_INPUT_QUERY;
				len = strlen(doc->prefix) + strlen(obj->name) + 16;
				newinput->u.input.desc = mapred_malloc(len);
				snprintf(newinput->u.input.desc, len,
						 "select * from %s%s",
						 doc->prefix, obj->name);

				/*
				 * Find parent input in the doclist and add the new object
				 * immediately after it.
				 */
				for (parent = doc->objects;
					 parent && parent->object != obj;
					 parent = parent->next);
				XASSERT(parent);
				newlist = mapred_malloc(sizeof(mapred_olist_t));
				newlist->object = newinput;
				newlist->next = parent->next;
				parent->next = newlist;
			}
			break;


		case MAPRED_MAPPER:
		case MAPRED_TRANSITION:
		case MAPRED_COMBINER:
		case MAPRED_FINALIZER:


			/*
			 * If the function is an internal function then we try to resolve
			 * the function by looking it up in the catalog.
			 */
			obj->u.function.internal_returns = NULL;
			obj->internal = false;

			if (!obj->u.function.language)
			{
				obj->internal = true;
				lookup_function_in_catalog(conn, doc, obj);
			}
			/* ??? */
			else if (!obj->u.function.returns)
			{
				XASSERT(false);
			}

			/*
			 * The function types may manufacture a dependency on an adt,
			 * but have no other dependencies.
			 */
			else if (obj->u.function.returns->next)
			{
				sub = mapred_malloc(sizeof(mapred_object_t));
				memset(sub, 0, sizeof(mapred_object_t));
				sub->kind = MAPRED_ADT;
				len = strlen(doc->prefix) + strlen(obj->name) + 7;
				sub->name = mapred_malloc(len);
				snprintf(sub->name, len, "%s%s_rtype",
						 doc->prefix, obj->name);
				sub->u.adt.returns = obj->u.function.returns;

				obj->u.function.rtype.name = sub->name;
				obj->u.function.rtype.object = sub;

				/* Add the ADT to the list of document objects */
				newlist = mapred_malloc(sizeof(mapred_olist_t));
				newlist->object = sub;
				newlist->next = doc->objects;
				doc->objects = newlist;

				/* And resolve the sub-object */
				mapred_resolve_object(conn, doc, sub, exec_count);
			}
			else
			{
				obj->u.function.rtype.name = obj->u.function.returns->type;
				obj->u.function.rtype.object = NULL;
			}
			break;


		case MAPRED_REDUCER:
		{
			/*
			 * If we have a function, but no object then we assume that it is
			 * a database function.  Create a dummy object to handle this case.
			 */
			mapred_resolve_ref(doc->objects, &obj->u.reducer.transition);
			if (obj->u.reducer.transition.name &&
				!obj->u.reducer.transition.object)
			{
				len = strlen(obj->u.reducer.transition.name) + 1;
				sub = mapred_malloc(sizeof(mapred_object_t));
				memset(sub, 0, sizeof(mapred_object_t));
				sub->kind = MAPRED_TRANSITION;
				sub->name = mapred_malloc(len);
				sub->line = obj->line;
				strncpy(sub->name, obj->u.reducer.transition.name, len);

				newlist = mapred_malloc(sizeof(mapred_olist_t));
				newlist->object = sub;
				newlist->next = doc->objects;
				doc->objects = newlist;
				obj->u.reducer.transition.object = sub;

				/* And resolve the sub-object */
				mapred_resolve_object(conn, doc, sub, exec_count);
			}
			mapred_resolve_ref(doc->objects, &obj->u.reducer.combiner);
			if (obj->u.reducer.combiner.name &&
				!obj->u.reducer.combiner.object)
			{
				len = strlen(obj->u.reducer.combiner.name) + 1;
				sub = mapred_malloc(sizeof(mapred_object_t));
				memset(sub, 0, sizeof(mapred_object_t));
				sub->kind = MAPRED_COMBINER;
				sub->name = mapred_malloc(len);
				sub->line = obj->line;
				strncpy(sub->name, obj->u.reducer.combiner.name, len);

				newlist = mapred_malloc(sizeof(mapred_olist_t));
				newlist->object = sub;
				newlist->next = doc->objects;
				doc->objects = newlist;
				obj->u.reducer.combiner.object = sub;

				/* And resolve the sub-object */
				mapred_resolve_object(conn, doc, sub, exec_count);
			}
			mapred_resolve_ref(doc->objects, &obj->u.reducer.finalizer);
			if (obj->u.reducer.finalizer.name &&
				!obj->u.reducer.finalizer.object)
			{
				len = strlen(obj->u.reducer.finalizer.name) + 1;
				sub = mapred_malloc(sizeof(mapred_object_t));
				memset(sub, 0, sizeof(mapred_object_t));
				sub->kind = MAPRED_FINALIZER;
				sub->name = mapred_malloc(len);
				sub->line = obj->line;
				strncpy(sub->name, obj->u.reducer.finalizer.name, len);

				newlist = mapred_malloc(sizeof(mapred_olist_t));
				newlist->object = sub;
				newlist->next = doc->objects;
				doc->objects = newlist;
				obj->u.reducer.finalizer.object = sub;

				/* And resolve the sub-object */
				mapred_resolve_object(conn, doc, sub, exec_count);
			}

			break;
		}


		case MAPRED_TASK:
		case MAPRED_EXECUTION:
		{
			/*
			 * Resolving a task may require recursion to resolve other
			 * tasks to work out parameter lists.  We keep track of
			 * our resolution state in order to detect potential
			 * infinite recursion issues.
			 */
			if (obj->u.task.flags & mapred_task_resolved)
				return;

			/* Assign a name to anonymous executions */
			if (!obj->name)
			{
				size_t     len;

				XASSERT(obj->u.task.execute);

				/* 10 characters for max int digits, 4 for "run_" */
				len = strlen(doc->prefix) + 16;
				obj->name = mapred_malloc(len);
				snprintf(obj->name, len, "%srun_%d",
						 doc->prefix, ++(*exec_count));
			}

			/* Check for infinite recursion */
			if (obj->u.task.flags & mapred_task_resolving)
			{
				mapred_obj_error(obj, "Infinite recursion detected while "
								 "trying to resove TASK");
				XRAISE(MAPRED_PARSE_ERROR, "Object creation Failure");
			}
			obj->u.task.flags |= mapred_task_resolving;

			/* Validate object types */
			if (obj->u.task.input.name)
			{
				mapred_resolve_ref(doc->objects, &obj->u.task.input);
				sub = obj->u.task.input.object;

				/* If we can't find the input, throw an error */
				if (!sub)
				{
					/* Can't find INPUT object */
					mapred_obj_error(obj, "SOURCE '%s' not found in document",
									 obj->u.task.input.name);
					XRAISE(MAPRED_PARSE_ERROR, "Object Resolution Failure");
				}

				/*
				 * The input must either be an INPUT or a TASK
				 */
				switch (sub->kind)
				{
					case MAPRED_INPUT:
						break;

					case MAPRED_TASK:
						/* This objects input is the sub objects output */
						mapred_resolve_object(conn, doc, sub, exec_count);
						break;

						/* Otherwise generate an error */
					default:

						/* SOURCE wasn't an INPUT */
						mapred_obj_error(obj, "SOURCE '%s' is neither an INPUT nor a TASK",
										 obj->u.task.input.name);
						XRAISE(MAPRED_PARSE_ERROR, "Object Resolution Failure");
				}
			}

			if (obj->u.task.mapper.name)
			{
				mapred_resolve_ref(doc->objects, &obj->u.task.mapper);
				sub = obj->u.task.mapper.object;

				if (!sub)
				{
					/* Create an internal map function */
					len = strlen(obj->u.task.mapper.name) + 1;
					sub = mapred_malloc(sizeof(mapred_object_t));
					memset(sub, 0, sizeof(mapred_object_t));
					sub->kind = MAPRED_MAPPER;
					sub->name = mapred_malloc(len);
					sub->line = obj->line;
					strncpy(sub->name, obj->u.task.mapper.name, len);

					newlist = mapred_malloc(sizeof(mapred_olist_t));
					newlist->object = sub;
					newlist->next = doc->objects;
					doc->objects = newlist;
					obj->u.task.mapper.object = sub;

					/* And resolve the sub-object */
					mapred_resolve_object(conn, doc, sub, exec_count);
				}
				else
				{
					/* Allow any function type */
					switch (sub->kind)
					{
						case MAPRED_MAPPER:
						case MAPRED_TRANSITION:
						case MAPRED_COMBINER:
						case MAPRED_FINALIZER:
							break;

						default:
							mapred_obj_error(obj, "MAP '%s' is not a MAP object",
											 obj->u.task.mapper.name);
							XRAISE(MAPRED_PARSE_ERROR, "Object Resolution Failure");
					}
				}
			}

			if (obj->u.task.reducer.name)
			{
				mapred_resolve_ref(doc->objects, &obj->u.task.reducer);
				sub = obj->u.task.reducer.object;

				if (!sub)
				{
					/* FIXME: non-yaml reducers */
				}
				else if (sub->kind == MAPRED_REDUCER)
				{   /* Validate Reducer */
					mapred_resolve_object(conn, doc, sub, exec_count);
				}
				else
				{   /* It's an object, but not a REDUCER */
					mapred_obj_error(obj, "REDUCE '%s' is not a REDUCE object",
									 obj->u.task.reducer.name);
					XRAISE(MAPRED_PARSE_ERROR, "Object Resolution Failure");
				}
			}

			if (obj->u.task.output.name)
			{
				mapred_resolve_ref(doc->objects, &obj->u.task.output);

				sub = obj->u.task.output.object;
				if (sub && sub->kind != MAPRED_OUTPUT)
				{
					mapred_obj_error(obj, "TARGET '%s' is not an OUTPUT object",
									 obj->u.task.output.name);
					XRAISE(MAPRED_PARSE_ERROR, "Object Resolution Failure");
				}
				if (!sub && obj->u.task.output.name)
				{
					mapred_obj_error(obj, "TARGET '%s' is not defined in "
									 "document",
									 obj->u.task.output.name);
					XRAISE(MAPRED_PARSE_ERROR, "Object Resolution Failure");
				}
			}

			/* clear resolving bit and set resolved bit */
			obj->u.task.flags &= !mapred_task_resolving;
			obj->u.task.flags |= mapred_task_resolved;
			break;
		}

		default:
			XASSERT(false);
	}

	if (global_debug_flag)
		mapred_obj_debug(obj);
}

/*
 * mapred_setup_columns -
 *   setup column lists (input, output, grouping, etc)
 *
 *   This is usually able to be determined directly from the YAML,
 *   but for some things (defined in the database rather than in
 *   the YAML, eg QUERY INPUTS) we can not determine the columns
 *   until the object has been created.  Which can trickle down to
 *   any object that depends on it.
 *
 *   For this reason we don't setup the columns during the parse phase,
 *   but rather just before or just after we actually create the object
 *   once we know that all the dependencies have already been created.
 */
void mapred_setup_columns(PGconn *conn, mapred_object_t *obj)
{
	mapred_object_t *sub;
	PGresult        *result;

	/* switch based on object type */
	switch (obj->kind)
	{
		case MAPRED_ADT:
			break;

		case MAPRED_INPUT:

			/*
			 * Should be called after creation, otherwise catalog queries
			 * could fail.
			 */
			XASSERT(obj->created);

			/* setup the column list for database defined inputs */
			if (obj->u.input.type == MAPRED_INPUT_TABLE ||
				obj->u.input.type == MAPRED_INPUT_QUERY)
			{
				/*
				 * This gets the ordered list of columns for the first
				 * input of the given name in the user's search path.
				 */
				buffer_t *buffer = makebuffer(1024, 1024);
				bufcat(&buffer,
					   "SELECT  attname, "
					   "        pg_catalog.format_type(atttypid, atttypmod)\n"
					   "FROM    pg_catalog.pg_attribute\n"
					   "WHERE   attnum > 0 AND attrelid = lower('");
				if (obj->u.input.type == MAPRED_INPUT_TABLE)
					bufcat(&buffer, obj->u.input.desc);
				else
					bufcat(&buffer, obj->name);
				bufcat(&buffer,
					   "')::regclass\n"
					   "ORDER BY   -attnum;\n\n");

				if (global_debug_flag)
					printf("%s", buffer->buffer);

				result = PQexec(conn, buffer->buffer);
				mapred_free(buffer);

				if (PQresultStatus(result) == PGRES_TUPLES_OK &&
					PQntuples(result) > 0)
				{
					mapred_plist_t *newitem;
					int i;

					/* Destroy any previous default values we setup */
					mapred_destroy_plist(&obj->u.input.columns);

					/*
					 * The columns were sorted reverse order above so
					 * the list can be generated back -> front
					 */
					for (i = 0; i < PQntuples(result); i++)
					{
						char *name = PQgetvalue(result, i, 0);
						char *type = PQgetvalue(result, i, 1);

						/* Add the column to the list */
						newitem = mapred_malloc(sizeof(mapred_plist_t));
						newitem->name = mapred_malloc(strlen(name)+1);
						strncpy(newitem->name, name, strlen(name)+1);
						newitem->type = mapred_malloc(strlen(type)+1);
						strncpy(newitem->type, type, strlen(type)+1);
						newitem->next = obj->u.input.columns;
						obj->u.input.columns = newitem;
					}
				}
				else
				{
					char *error = PQresultErrorField(result, PG_DIAG_SQLSTATE);
					char *name;

					if (obj->u.input.type == MAPRED_INPUT_TABLE)
						name = obj->u.input.desc;
					else
						name = obj->name;

					if (PQresultStatus(result) == PGRES_TUPLES_OK)
					{
						mapred_obj_error(obj, "Table '%s' contains no rows", name);
					}
					else if (!strcmp(error, OBJ_DOES_NOT_EXIST) ||
							 !strcmp(error, SCHEMA_DOES_NOT_EXIST) )
					{
						mapred_obj_error(obj, "Table '%s' not found", name);
					}
					else
					{
						mapred_obj_error(obj, "Table '%s' unknown error: %s", name, error);
					}
					XRAISE(MAPRED_PARSE_ERROR, "Object creation Failure");
				}
				PQclear(result);
			}
			break;


		case MAPRED_OUTPUT:
			break;

		case MAPRED_MAPPER:
		case MAPRED_TRANSITION:
		case MAPRED_COMBINER:
		case MAPRED_FINALIZER:
			XASSERT(obj->u.function.parameters);
			XASSERT(obj->u.function.returns);
			break;


		case MAPRED_REDUCER:
		{
			mapred_object_t *transition = obj->u.reducer.transition.object;

			XASSERT(transition);
			XASSERT(transition->u.function.parameters);
			obj->u.reducer.parameters =
				transition->u.function.parameters->next;

			/*
			 * Use the return result of:
			 *   1) The finalizer
			 *   2) The combiner, or
			 *   3) The transition
			 *
			 * in that order, if the return is not derivable then
			 * fall into the default value of a single text column
			 * named "value"
			 */
			if (obj->u.reducer.finalizer.name)
				sub = obj->u.reducer.finalizer.object;
			else if (obj->u.reducer.combiner.name)
				sub = obj->u.reducer.combiner.object;
			else
				sub = obj->u.reducer.transition.object;

			if (sub)
				obj->u.reducer.returns = sub->u.function.returns;

			if (!obj->u.reducer.returns)
			{
				/*
				 * If unable to determine the returns based on the reducer
				 * components (generally due to use of SQL functions) then
				 * use the default of a single text column named "value".
				 */
				obj->u.reducer.returns = mapred_malloc(sizeof(mapred_plist_t));
				obj->u.reducer.returns->name = "value";
				obj->u.reducer.returns->type = "text";
				obj->u.reducer.returns->next = NULL;
			}
			break;
		}


		case MAPRED_TASK:
		case MAPRED_EXECUTION:
		{
			mapred_plist_t *scan;
			mapred_plist_t *last = NULL;

			/*
			 * The input must either be an INPUT or a TASK
			 */
			sub = obj->u.task.input.object;
			switch (sub->kind)
			{
				case MAPRED_INPUT:
					obj->u.task.parameters = sub->u.input.columns;
					break;

				case MAPRED_TASK:
					/* union the input tasks returns and grouping */
					for (scan = sub->u.task.grouping;
						 scan;
						 scan = scan->next)
					{
						if (!last)
						{
							obj->u.task.parameters =
								mapred_malloc(sizeof(mapred_plist_t));
							last = obj->u.task.parameters;
						}
						else
						{
							last->next =
								mapred_malloc(sizeof(mapred_plist_t));
							last = last->next;
						}
						last->name = scan->name;
						last->type = scan->type;
						last->next = NULL;
					}
					for (scan = sub->u.task.returns;
						 scan;
						 scan = scan->next)
					{
						if (!last)
						{
							obj->u.task.parameters =
								mapred_malloc(sizeof(mapred_plist_t));
							last = obj->u.task.parameters;
						}
						else
						{
							last->next =
								mapred_malloc(sizeof(mapred_plist_t));
							last = last->next;
						}
						last->name = scan->name;
						last->type = scan->type;
						last->next = NULL;
					}
					break;

				default:
					/* Should have already been validated */
					XASSERT(false);
			}

			if (obj->u.task.mapper.name)
			{
				sub = obj->u.task.mapper.object;
				if (!sub)
				{
					/* FIXME: Lookup function in database */
					/* for now... do nothing */
				}
				else
				{
					/* Allow any function type */
					switch (sub->kind)
					{
						case MAPRED_MAPPER:
						case MAPRED_TRANSITION:
						case MAPRED_COMBINER:
						case MAPRED_FINALIZER:
							break;

						default:
							/* Should have already been validated */
							XASSERT(false);
					}
				}
			}

			if (obj->u.task.reducer.name)
			{
				mapred_clist_t *keys;
				mapred_plist_t *source;

				/*
				 * The grouping columns for a task are the columns produced
				 * by the input/mapper that are not consumed by the reducer.
				 *
				 * A special exception is made for a column named "key" which
				 * is always a grouping column.
				 *
				 * FIXME: deal with non-yaml map functions
				 *
				 * FIXME: deal with KEY specifications
				 */
				if (obj->u.task.mapper.object)
					source = obj->u.task.mapper.object->u.function.returns;
				else
					source = obj->u.task.parameters;

				sub = obj->u.task.reducer.object;
				if (!sub)
				{
					/*
					 * The output of a built in function is defined to be
					 * "value", with an input of "value", everything else
					 * is defined to be a grouping column.
					 */
					last = NULL;
					for (scan = source; scan; scan = scan->next)
					{
						if (strcasecmp(scan->name, "value"))
						{
							if (!last)
							{
								obj->u.task.grouping =
									mapred_malloc(sizeof(mapred_plist_t));
								last = obj->u.task.grouping;
							}
							else
							{
								last->next =
									mapred_malloc(sizeof(mapred_plist_t));
								last = last->next;
							}
							last->name = scan->name;
							last->type = scan->type;
							last->next = NULL;
						}
					}
				}
				else
				{
					/* Validate Reducer */
					XASSERT(sub->kind == MAPRED_REDUCER);

					/*
					 * source is the set of input columns that the reducer has
					 * to work with.
					 *
					 * Loop the reducer "keys" clause to determine what keys are
					 * present.
					 */
					last = NULL;
					for (keys = sub->u.reducer.keys; keys; keys = keys->next)
					{
						/*
						 * If there is a '*' in the keys then it catches all
						 * unreferenced columns.
						 */
						if (keys->value[0] == '*' && keys->value[1] == '\0')
						{
							/*
							 * Add all sources not found in either parameters,
							 * or explicitly mentioned in keys
							 */
							for (scan = source; scan; scan = scan->next)
							{
								mapred_plist_t *pscan;
								mapred_clist_t *kscan;

								for (pscan = sub->u.reducer.parameters;
									 pscan;
									 pscan = pscan->next)
								{
									if (!strcasecmp(scan->name, pscan->name))
										break;
								}
								if (pscan)
									continue;   /* found in parameters */
								for (kscan = sub->u.reducer.keys;
									 kscan;
									 kscan = kscan->next)
								{
									if (!strcasecmp(scan->name, kscan->value))
										break;
								}
								if (kscan)
									continue;   /* found in keys */

								/* we have an unmatched source, add to grouping */
								if (!last)
								{
									obj->u.task.grouping =
										mapred_malloc(sizeof(mapred_plist_t));
									last = obj->u.task.grouping;
								}
								else
								{
									last->next =
										mapred_malloc(sizeof(mapred_plist_t));
									last = last->next;
								}
								last->name = scan->name;
								last->type = scan->type;
								last->next = NULL;
							}
						}
						else
						{
							/* Look for the referenced key in the source list */
							for (scan = source; scan; scan = scan->next)
								if (!strcasecmp(keys->value, scan->name))
								{
									/* we have a match, add the key to grouping */
									if (!last)
									{
										obj->u.task.grouping =
											mapred_malloc(sizeof(mapred_plist_t));
										last = obj->u.task.grouping;
									}
									else
									{
										last->next =
											mapred_malloc(sizeof(mapred_plist_t));
										last = last->next;
									}
									last->name = scan->name;
									last->type = scan->type;
									last->next = NULL;
									break;
								}
						}
					}
				}
			}

			/*
			 * If there is a reducer then the "returns" columns are the
			 * output of the reducer, and must be unioned with the grouping
			 * columns for final output.
			 *
			 * If there is no reducer then the returns columns are the
			 * returns columns of the mapper or the input
			 */
			if (obj->u.task.reducer.name)
			{
				/*
				 * If it is a built in function then we'll just fall into the
				 * default of a single text column named "value".
				 */
				sub = obj->u.task.reducer.object;
				if (sub)
					obj->u.task.returns = sub->u.reducer.returns;

			}
			else if (obj->u.task.mapper.name)
			{
				sub = obj->u.task.mapper.object;
				if (sub)
					obj->u.task.returns = sub->u.function.returns;
			}
			else
			{
				obj->u.task.returns = obj->u.task.parameters;
			}

			if (!obj->u.task.returns)
			{
				/*
				 * If unable to determine the returns based on the reducer
				 * components (generally due to use of SQL functions) then
				 * use the default of a single text column named "value".
				 */
				obj->u.task.returns = mapred_malloc(sizeof(mapred_plist_t));
				obj->u.task.returns->name = "value";
				obj->u.task.returns->type = "text";
				obj->u.task.returns->next = NULL;
			}
			break;
		}

		default:
			XASSERT(false);
	}
}

void mapred_resolve_ref(mapred_olist_t *olist, mapred_reference_t *ref)
{
	XASSERT(ref);
	if (!ref->name)
		return;

	/* Scan the list of objects until we find one with a matching name */
	for (; olist; olist = olist->next)
	{
		if (olist->object->name && !strcasecmp(ref->name, olist->object->name))
		{
			ref->object = olist->object;
			return;
		}
	}
}

/* Some basic destructors */
void mapred_destroy_object(mapred_object_t **objh)
{
	mapred_object_t *obj;

	/*
	 * We are passed a handle to the object, get the actual pointer and point
	 * the handle to NULL so that it is not stale once we free the list below.
	 */
	if (!objh || !*objh)
		return;
	obj = *objh;
	*objh = (mapred_object_t *) NULL;

	/* What fields are valid is dependent on what kind of object it is */
	scalarfree(obj->name);
	switch (obj->kind)
	{
		case MAPRED_NO_KIND:
			break;

		case MAPRED_DOCUMENT:
			scalarfree(obj->u.document.version);
			scalarfree(obj->u.document.database);
			scalarfree(obj->u.document.user);
			scalarfree(obj->u.document.host);
			mapred_destroy_olist(&obj->u.document.objects);
			mapred_destroy_olist(&obj->u.document.execute);
			break;

		case MAPRED_INPUT:
			scalarfree(obj->u.input.desc);
			scalarfree(obj->u.input.delimiter);
			scalarfree(obj->u.input.encoding);
			mapred_destroy_clist(&obj->u.input.files);
			mapred_destroy_plist(&obj->u.input.columns);
			break;

		case MAPRED_OUTPUT:
			scalarfree(obj->u.output.desc);
			break;

		case MAPRED_MAPPER:
		case MAPRED_TRANSITION:
		case MAPRED_COMBINER:
		case MAPRED_FINALIZER:
			scalarfree(obj->u.function.body);
			scalarfree(obj->u.function.language);
			mapred_destroy_plist(&obj->u.function.parameters);

			if( obj->internal &&
					obj->u.function.internal_returns != obj->u.function.returns )
				mapred_destroy_plist(&obj->u.function.internal_returns);

			mapred_destroy_plist(&obj->u.function.returns);
			break;

		case MAPRED_REDUCER:
			scalarfree(obj->u.reducer.transition.name);
			scalarfree(obj->u.reducer.combiner.name);
			scalarfree(obj->u.reducer.finalizer.name);
			scalarfree(obj->u.reducer.initialize);
			break;

		case MAPRED_TASK:
		case MAPRED_EXECUTION:
			scalarfree(obj->u.task.input.name);
			scalarfree(obj->u.task.mapper.name);
			scalarfree(obj->u.task.reducer.name);
			scalarfree(obj->u.task.output.name);
			break;

		/*
		 * ADT just borrowed the parameter list from the owning function,
		 * so it has nothing else to delete.
		 */
		case MAPRED_ADT:
			break;

		default:
			XASSERT(false);
	}
}

void mapred_destroy_olist(mapred_olist_t **olisth)
{
	mapred_olist_t *olist;
	mapred_olist_t *next;

	/*
	 * We are passed a handle to the olist, get the actual pointer and point
	 * the handle to NULL so that it is not stale once we free the list below.
	 */
	if (!olisth || !*olisth)
		return;
	olist = *olisth;
	*olisth = (mapred_olist_t *) NULL;

	/* Walk the list destroying each item as we come to it. */
	while (olist)
	{
		mapred_destroy_object(&olist->object);
		next = olist->next;
		mapred_free(olist);
		olist = next;
	}
}


void mapred_destroy_clist(mapred_clist_t **clisth)
{
	mapred_clist_t *clist;
	mapred_clist_t *next;

	/*
	 * We are passed a handle to the olist, get the actual pointer and point
	 * the handle to NULL so that it is not stale once we free the list below.
	 */
	if (!clisth || !*clisth)
		return;
	clist = *clisth;
	*clisth = (mapred_clist_t *) NULL;

	/* Walk the list destroying each item as we come to it. */
	while (clist)
	{
		scalarfree(clist->value);
		next = clist->next;
		mapred_free(clist);
		clist = next;
	}
}

void mapred_destroy_plist(mapred_plist_t **plisth)
{
	mapred_plist_t *plist;
	mapred_plist_t *next;

	/*
	 * We are passed a handle to the olist, get the actual pointer and point
	 * the handle to NULL so that it is not stale once we free the list below.
	 */
	if (!plisth || !*plisth)
		return;
	plist = *plisth;
	*plisth = (mapred_plist_t *) NULL;

	/* Walk the list destroying each item as we come to it. */
	while (plist)
	{
		scalarfree(plist->name);
		scalarfree(plist->type);
		next = plist->next;
		mapred_free(plist);
		plist = next;
	}
}




/* -------------------------------------------------------------------------- */
/* Functions that get things done                                             */
/* -------------------------------------------------------------------------- */
void mapred_run_queries(PGconn *conn, mapred_document_t *doc)
{
	mapred_olist_t  *olist;
	mapred_plist_t  *columns;
	mapred_object_t *output;
	PGresult        *result  = NULL;
	FILE            *outfile = stdout;
	buffer_t        *buffer  = NULL;

	XTRY
	{
		/* allocates 512 bytes, extending by 512 bytes if we run out. */
		buffer = makebuffer(512, 512);

		/* Loop through all objects */
		for (olist = doc->objects; olist; olist = olist->next)
		{
			if (olist->object->kind == MAPRED_EXECUTION)
			{
				boolean exists = false;

				XASSERT(olist->object->name);

				/* Reset the buffer from any previous executions */
				bufreset(buffer);

				output = olist->object->u.task.output.object;

				/*
				 *  [CREATE TABLE <name> AS ]
				 *    SELECT * FROM <name>
				 *    ORDER BY <column-list>
				 */
				if (output && output->u.output.type == MAPRED_OUTPUT_TABLE)
				{
					/* does the table already exist? */
					bufcat(&buffer,
						   "SELECT n.nspname \n"
						   "FROM   pg_catalog.pg_class c JOIN \n"
						   "       pg_catalog.pg_namespace n on \n"
						   "       (c.relnamespace = n.oid) \n"
						   "WHERE  n.nspname = ANY(current_schemas(true)) \n"
						   "  AND  c.relname = lower('");
					bufcat(&buffer, output->u.output.desc);
					bufcat(&buffer, "')");
					result = PQexec(conn, buffer->buffer);
					if (PQresultStatus(result) == PGRES_TUPLES_OK &&
						PQntuples(result) > 0)
						exists = true;
					bufreset(buffer);

					if (exists && output->u.output.mode == MAPRED_OUTPUT_MODE_REPLACE)
					{
						bufcat(&buffer, "DROP TABLE ");
						bufcat(&buffer, output->u.output.desc);
						PQexec(conn, "SAVEPOINT mapreduce_save");
						result = PQexec(conn, buffer->buffer);
						if (PQresultStatus(result) == PGRES_COMMAND_OK)
						{
							PQexec(conn, "RELEASE SAVEPOINT mapreduce_save");
						}
						else
						{
							/* rollback to savepoint */
							PQexec(conn, "ROLLBACK TO SAVEPOINT mapreduce_save");
							PQexec(conn, "RELEASE SAVEPOINT mapreduce_save");

							if (global_verbose_flag)
								fprintf(stderr, "   - ");
							fprintf(stderr, "Error: %s\n",
									PQresultErrorField(result, PG_DIAG_MESSAGE_PRIMARY));
							mapred_obj_error(output, "Table '%s' can't be replaced",
											 output->u.output.desc);
							XRAISE(MAPRED_PARSE_ERROR, "Object creation Failure");
						}
						bufreset(buffer);
						exists = false;
					}

					/* Handle Explain for OUTPUT TABLE */
					if (global_explain_flag & global_analyze)
						bufcat(&buffer, "EXPLAIN ANALYZE ");
					else if (global_explain_flag)
						bufcat(&buffer, "EXPLAIN ");

					if (!exists)
					{
						bufcat(&buffer, "CREATE TABLE ");
						bufcat(&buffer, output->u.output.desc);
						bufcat(&buffer, " AS ");
					}
					else if (output->u.output.mode == MAPRED_OUTPUT_MODE_APPEND)
					{
						bufcat(&buffer, "INSERT INTO ");
						bufcat(&buffer, output->u.output.desc);
						bufcat(&buffer, " (");
					}
					else
					{
						/* exists, mode is neither replace or append => error */
						mapred_obj_error(output, "Table '%s' already exists",
										 output->u.output.desc);
						XRAISE(MAPRED_PARSE_ERROR, "Object creation Failure");
					}
				}
				/* Handle Explain for non-table output */
				else if (global_explain_flag & global_analyze)
				{
					bufcat(&buffer, "EXPLAIN ANALYZE ");
				}
				else if (global_explain_flag)
				{
					bufcat(&buffer, "EXPLAIN ");
				}

				bufcat(&buffer, "SELECT * FROM ");
				bufcat(&buffer, olist->object->name);

				/*
				 * add the DISTRIBUTED BY clause for output tables
				 * OR, the ORDER BY clause for other output formats
				 */
				if (output && output->u.output.type == MAPRED_OUTPUT_TABLE)
				{
					/*
					 * If there are no key columns then leave off the
					 * distributed by clause and let the server choose.
					 */
					if (exists)
						bufcat(&buffer, ")");

					else if (olist->object->u.task.grouping)
					{
						bufcat(&buffer, " DISTRIBUTED BY (");
						columns = olist->object->u.task.grouping;
						while (columns)
						{
							bufcat(&buffer, columns->name);
							if (columns->next)
								bufcat(&buffer, ", ");
							columns = columns->next;
						}
						bufcat(&buffer, ")");
					}
					else
					{
						/*
						 * don't have any hints for what the distribution keys
						 * should be, so we do nothing and let the database
						 * decide
						 */
					}
				}
				else
				{
					if (olist->object->u.task.returns ||
						olist->object->u.task.grouping)
					{
						bufcat(&buffer, " ORDER BY ");
						columns = olist->object->u.task.grouping;
						while (columns)
						{
							bufcat(&buffer, columns->name);
							if (columns->next || olist->object->u.task.returns)
								bufcat(&buffer, ", ");
							columns = columns->next;
						}
						columns = olist->object->u.task.returns;
						while (columns)
						{
							bufcat(&buffer, columns->name);
							if (columns->next)
								bufcat(&buffer, ", ");
							columns = columns->next;
						}
					}
				}
				bufcat(&buffer, ";\n");

				/* Tell the user what job we are running */
				if (global_verbose_flag)
					fprintf(stderr, "  - RUN: ");
				if (global_print_flag || global_debug_flag)
					fprintf(stderr, "%s", buffer->buffer);
				else
					fprintf(stderr, "%s\n", olist->object->name);

				/* But we only execute it if we are not in "print-only" mode */
				if (!global_print_flag)
				{
					/* If we have an output file, open it for write now */
					if (output && output->u.output.type == MAPRED_OUTPUT_FILE)
					{
						switch (output->u.output.mode)
						{
							case MAPRED_OUTPUT_MODE_NONE:
								/* check if the file exists */
								if (access(output->u.output.desc, F_OK) == 0)
								{
									mapred_obj_error(output, "file '%s' already exists",
													 output->u.output.desc);
									XRAISE(MAPRED_PARSE_ERROR, "Object creation Failure");
								}
								/* Fallthrough */

							case MAPRED_OUTPUT_MODE_REPLACE:
								outfile = fopen(output->u.output.desc, "wb");
								break;

							case MAPRED_OUTPUT_MODE_APPEND:
								outfile = fopen(output->u.output.desc, "ab");
								break;

							default:
								XASSERT(false);
						}

						if (!outfile)
						{
							mapred_obj_error(output, "could not open file '%s' for write",
											 output->u.output.desc);
							XRAISE(MAPRED_PARSE_ERROR, "Object creation Failure");
						}
					}
					else
					{
						outfile = stdout;
					}

					/*
					 * Enable notices for user queries since they may contain
					 * debugging info.
					 */
					PQsetNoticeReceiver(conn, print_notice_handler, NULL);
					result = PQexec(conn, buffer->buffer);
					PQsetNoticeReceiver(conn, ignore_notice_handler, NULL);
					switch (PQresultStatus(result))
					{
						/* Output is STDOUT or FILE */
						case  PGRES_TUPLES_OK:
						{
							PQprintOpt options;
							memset(&options, 0, sizeof(options));

							/*
							 * Formatting:
							 *   STDOUT = fancy formatting
							 *   FILE   = plain formatting
							 */
							if (outfile == stdout)
							{
								options.header = true;
								options.align  = true;
								options.fieldSep  = "|";
							}
							else if (output->u.output.delimiter)
							{
								options.fieldSep = output->u.output.delimiter;
							}
							else
							{
								/* "\t" is our default delimiter */
								options.fieldSep  = "\t";
							}

							PQprint(outfile, result, &options);
							break;
						}

						/* OUTPUT is a table */
						case PGRES_COMMAND_OK:
							fprintf(stderr, "DONE\n");
							break;

						/* An error of some kind */
						default:
							XRAISE(MAPRED_SQL_ERROR, "Execution Failure");
					}
					PQclear(result);
					result = NULL;

					if (NULL != outfile && outfile != stdout)
					{
						fclose(outfile);
						outfile = stdout;
					}
				}
			}
		}
	}
	XFINALLY
	{
		if (result)
			PQclear(result);

		if (NULL != outfile && outfile != stdout)
		{
			fclose(outfile);
			outfile = stdout;
		}

		if (buffer)
			mapred_free(buffer);
	}
	XTRY_END;
}


boolean mapred_create_object(PGconn *conn, mapred_document_t *doc,
							 mapred_object_t *obj)
{
	mapred_clist_t *clist    = NULL;
	mapred_plist_t *plist    = NULL;
	mapred_plist_t *plist2   = NULL;
	const char     *ckind    = NULL;
	buffer_t       *buffer   = NULL;
	buffer_t       *qbuffer  = NULL;
	PGresult       *result   = NULL;

	/* If the object was created in a prior pass, then do nothing */
	if (obj->created)
		return true;

	/* Otherwise attempt to create the object */
	XTRY
	{
		/* allocates 1024 bytes, extending by 1024 bytes if we run out */
		buffer = makebuffer(1024, 1024);

		switch (obj->kind)
		{
			case MAPRED_INPUT:
				XASSERT(obj->name);

				switch (obj->u.input.type)
				{
					case MAPRED_INPUT_TABLE:
						/* Nothing to actually create */
						obj->created = true;
						break;

					case MAPRED_INPUT_FILE:
					case MAPRED_INPUT_GPFDIST:
						XASSERT(obj->u.input.files);

						/* Allocate and produce buffer */
						bufcat(&buffer, "CREATE EXTERNAL TABLE ");
						bufcat(&buffer, doc->prefix);
						bufcat(&buffer, obj->name);
						bufcat(&buffer, "(");
						for (plist = obj->u.input.columns;
							 plist;
							 plist = plist->next)
						{
							bufcat(&buffer, plist->name);
							bufcat(&buffer, " ");
							bufcat(&buffer, plist->type);
							if (plist->next)
								bufcat(&buffer, ", ");
						}
						bufcat(&buffer, ")\n");
						bufcat(&buffer, "  LOCATION(");
						for (clist = obj->u.input.files;
							 clist;
							 clist = clist->next)
						{
							char *domain_port, *path, *p = NULL;

							if (obj->u.input.type == MAPRED_INPUT_GPFDIST)
								bufcat(&buffer, "'gpfdist://");
							else
								bufcat(&buffer, "'file://");

							/*
							 * The general syntax of a URL is    scheme://domain:port/path?query_string#fragment_id
							 * clist->value should contain just           domain:port/path?query_string#fragment_id
							 */
							p = strchr(clist->value, '/');
							if (p == NULL)
							{
								mapred_obj_error(obj, "Failed to find '/' indicating start of path (%s)",
												 clist->value);
								XRAISE(MAPRED_PARSE_ERROR,
									   "Invalid INPUT source specification");
							}
							if (p == clist->value)
							{
								mapred_obj_error(obj, "Missing domain and port before '/' indicating start of path (%s)",
												 clist->value);
								XRAISE(MAPRED_PARSE_ERROR,
									   "Invalid INPUT source specification");
							}
							domain_port = clist->value;
							path        = p+1;

							/*
							 * Overwrite the / separating the domain:port from the path
							 * with a nul and move back one byte to check for a trailing ':'.
							 * We put the / back in when copying into the destination buffer.
							 */
							*p-- = '\0';

							if (strlen(path) < 1)
							{
								mapred_obj_error(obj, "Missing path after '/' (%s)",
												 clist->value);
								XRAISE(MAPRED_PARSE_ERROR,
									   "Invalid INPUT source specification");
							}

							/*
							 * We allow a trailing ':'  (e.g. host:/filepath)
							 * but we must not copy it into the external table url.
							 */
							if (*p == ':')
								*p = '\0';

							if (strlen(domain_port) < 1)
							{
								mapred_obj_error(obj, "Missing host before '/' (%s)",
												 clist->value);
								XRAISE(MAPRED_PARSE_ERROR,
									   "Invalid INPUT source specification");
							}

							bufcat(&buffer, domain_port);
							bufcat(&buffer, "/");
							bufcat(&buffer, path);
							if (clist->next)
								bufcat(&buffer, "',\n           ");
						}
						bufcat(&buffer, "')\n");
						if (obj->u.input.format == MAPRED_FORMAT_CSV)
							bufcat(&buffer, "  FORMAT 'CSV'");
						else
							bufcat(&buffer, "  FORMAT 'TEXT'");
						if (obj->u.input.delimiter ||
							obj->u.input.escape    ||
							obj->u.input.quote     ||
							obj->u.input.null)
						{
							bufcat(&buffer, " ( ");
							if (obj->u.input.delimiter)
							{
								bufcat(&buffer, "DELIMITER '");
								bufcat(&buffer, obj->u.input.delimiter);
								bufcat(&buffer, "' ");
							}
							if (obj->u.input.escape)
							{
								bufcat(&buffer, "ESCAPE '");
								bufcat(&buffer, obj->u.input.escape);
								bufcat(&buffer, "' ");
							}
							if (obj->u.input.quote)
							{
								bufcat(&buffer, "QUOTE '");
								bufcat(&buffer, obj->u.input.quote);
								bufcat(&buffer, "' ");
							}
							if (obj->u.input.null)
							{
								bufcat(&buffer, "NULL '");
								bufcat(&buffer, obj->u.input.null);
								bufcat(&buffer, "' ");
							}
							bufcat(&buffer, ")");
						}

						if (obj->u.input.error_limit > 0)
						{
							char intbuf[11];
							snprintf(intbuf, 11, "%d",
									 obj->u.input.error_limit);

							bufcat(&buffer, "\n  SEGMENT REJECT LIMIT ");
							bufcat(&buffer, intbuf);
						}

						bufcat(&buffer, ";\n\n");
						break;

					case MAPRED_INPUT_EXEC:
						XASSERT(obj->u.input.desc);

						bufcat(&buffer, "CREATE EXTERNAL WEB TABLE ");
						bufcat(&buffer, doc->prefix);
						bufcat(&buffer, obj->name);
						bufcat(&buffer, "(");
						for (plist = obj->u.input.columns;
							 plist;
							 plist = plist->next)
						{
							bufcat(&buffer, plist->name);
							bufcat(&buffer, " ");
							bufcat(&buffer, plist->type);
							if (plist->next)
								bufcat(&buffer, ", ");
						}
						bufcat(&buffer, ")\n");
						bufcat(&buffer, "EXECUTE '");
						bufcat(&buffer, obj->u.input.desc);
						bufcat(&buffer, "'\n");
						if (obj->u.input.format == MAPRED_FORMAT_CSV)
							bufcat(&buffer, "  FORMAT 'CSV'");
						else
							bufcat(&buffer, "  FORMAT 'TEXT'");
						if (obj->u.input.delimiter ||
							obj->u.input.quote     ||
							obj->u.input.null)
						{
							bufcat(&buffer, " ( ");
							if (obj->u.input.delimiter)
							{
								bufcat(&buffer, "DELIMITER '");
								bufcat(&buffer, obj->u.input.delimiter);
								bufcat(&buffer, "' ");
							}
							if (obj->u.input.quote)
							{
								bufcat(&buffer, "QUOTE '");
								bufcat(&buffer, obj->u.input.quote);
								bufcat(&buffer, "' ");
							}
							if (obj->u.input.null)
							{
								bufcat(&buffer, "NULL '");
								bufcat(&buffer, obj->u.input.null);
								bufcat(&buffer, "' ");
							}
							bufcat(&buffer, ")");
						}

						if (obj->u.input.error_limit > 0)
						{
							char intbuf[11];
							snprintf(intbuf, 11, "%d",
									 obj->u.input.error_limit);

							bufcat(&buffer, "\n  SEGMENT REJECT LIMIT ");
							bufcat(&buffer, intbuf);
						}
						bufcat(&buffer, ";\n\n");
						break;

					case MAPRED_INPUT_QUERY:
						XASSERT(obj->u.input.desc);

						/*
						 *  CREATE TEMPORARY VIEW <name> AS
						 *  <desc>;
						 */
						bufcat(&buffer, "CREATE TEMPORARY VIEW ");
						bufcat(&buffer, obj->name);
						bufcat(&buffer, " AS\n");
						bufcat(&buffer, obj->u.input.desc);
						bufcat(&buffer, ";\n\n");
						break;

					case MAPRED_INPUT_NONE:
					default:
						XASSERT(false);
				}
				if (global_print_flag || global_debug_flag)
					printf("-- INPUT %s\n", obj->name);
				break;

			case MAPRED_OUTPUT:
				/*
				 * Outputs have no backend objects created directly.
				 * For output tables we may issue a create table as
				 * select, but that occurs at run-time.
				 */
				obj->created = true;
				mapred_setup_columns(conn, obj);
				break;

				/*
				 * The function types have different defaults and generate
				 * slightly different error messages, but basically do the
				 * same thing.
				 */
			case MAPRED_MAPPER:
			case MAPRED_TRANSITION:
			case MAPRED_COMBINER:
			case MAPRED_FINALIZER:
				ckind = mapred_kind_name[obj->kind];

				XASSERT(obj->name);

				/*
				 * 'kind' specific initialization accomplished above, now handle
				 * the generic function creation.
				 */
				if (global_print_flag || global_debug_flag)
					printf("-- %s %s\n", ckind, obj->name);

				/*
				 * Nothing to do if we already looked up the function in the
				 * catalog.
				 */
				if (obj->internal)
				{
					obj->created = true;
					break;
				}

				/* Non-internal functions should have these defined */
				XASSERT(obj->u.function.body);
				XASSERT(obj->u.function.language);

				mapred_setup_columns(conn, obj);
				XASSERT(obj->u.function.parameters);
				XASSERT(obj->u.function.rtype.name);
				XASSERT(NULL == obj->u.function.internal_returns);

				/*
				 * fill in the buffer:
				 *
				 *    CREATE FUNCTION <name>(<parameters>)
				 *    RETURNS [SETOF] <rtype> LANGUAGE <lang> AS
				 *    $$
				 *    <body>
				 *    $$ [STRICT] [IMMUTABLE];
				 *
				 */
				bufcat(&buffer, "CREATE FUNCTION ");
				bufcat(&buffer, doc->prefix);
				bufcat(&buffer, obj->name);
				bufcat(&buffer, "(");

				/* Handle parameter list */
				for (plist = obj->u.function.parameters;
					 plist;
					 plist = plist->next)
				{
					bufcat(&buffer, plist->name);
					bufcat(&buffer, " ");
					bufcat(&buffer, plist->type);
					if (plist->next)
						bufcat(&buffer, ", ");
				}

				/* Handle Return clause */
				bufcat(&buffer, ")\nRETURNS ");
				if (obj->u.function.mode == MAPRED_MODE_MULTI)
					bufcat(&buffer, "SETOF ");
				bufcat(&buffer, obj->u.function.rtype.name);

				/*
				 * Handle LANGUAGE clause, every langauge but 'C' and 'SQL'
				 * has 'pl' prefixing it
				 */
				if (!strcasecmp("C", obj->u.function.language) ||
					!strcasecmp("SQL", obj->u.function.language) ||
					!strncasecmp("PL", obj->u.function.language, 2))
				{
					bufcat(&buffer, " LANGUAGE ");
					bufcat(&buffer, obj->u.function.language);
				}
				else
				{
					bufcat(&buffer, " LANGUAGE pl");
					bufcat(&buffer, obj->u.function.language);
				}

				/* python only has an untrusted form */
				if (!strcasecmp("python", obj->u.function.language))
					bufcat(&buffer, "u");


				bufcat(&buffer, " AS ");

				/*
				 * Handle procedural language specific formatting for the
				 * function definition.
				 *
				 * C language functions are defined using the two parameter
				 * form:  AS "library", "function".
				 *
				 * Perl functions append the yaml file line number via a
				 *  #line declaration.
				 *
				 * Python functions try to append the yaml file line number
				 * by inserting a bunch of newlines.  (only works for runtime
				 * errors, not compiletime errors).
				 */
				if (!strcasecmp("C", obj->u.function.language))
				{
					bufcat(&buffer, "$$");
					bufcat(&buffer, obj->u.function.library);
					bufcat(&buffer, "$$, $$");
					bufcat(&buffer, obj->u.function.body);
					bufcat(&buffer, "$$");
				}
				else if (!strncasecmp("plperl", obj->u.function.language, 6) ||
						 !strncasecmp("perl",   obj->u.function.language, 4))
				{
					char lineno[10];
					snprintf(lineno, sizeof(lineno), "%d", obj->u.function.lineno);
					bufcat(&buffer, "$$\n#line ");
					bufcat(&buffer, lineno);
					bufcat(&buffer, "\n");
					bufcat(&buffer, obj->u.function.body);
					if (buffer->buffer[buffer->position-1] != '\n')
						bufcat(&buffer, "\n");
					bufcat(&buffer, "$$");
				}
				else if (!strncasecmp("plpython", obj->u.function.language, 8) ||
						 !strncasecmp("python",   obj->u.function.language, 6))
				{
					/*
					 * Python very stubborn about not letting you manually
					 * adjust line number.  So instead we take the stupid route
					 * and just insert N newlines.
					*/
					int i;
					bufcat(&buffer, "$$\n");
					for (i = 1; i < obj->u.function.lineno-2; i++)
						bufcat(&buffer, "\n");
					bufcat(&buffer, obj->u.function.body);
					if (buffer->buffer[buffer->position-1] != '\n')
						bufcat(&buffer, "\n");
					bufcat(&buffer, "$$");
				}
				else
				{
					/* Some generic other language, take our best guess */
					bufcat(&buffer, "$$");
					bufcat(&buffer, obj->u.function.body);
					bufcat(&buffer, "$$");
				}

				/* Handle options */
				if (obj->u.function.flags & mapred_function_strict)
					bufcat(&buffer, " STRICT");
				if (obj->u.function.flags & mapred_function_immutable)
					bufcat(&buffer, " IMMUTABLE");

				/* All done */
				bufcat(&buffer, ";\n\n");
				break;

			case MAPRED_REDUCER:
			{
				mapred_object_t *transition = obj->u.reducer.transition.object;
				mapred_object_t *combiner   = obj->u.reducer.combiner.object;
				mapred_object_t *finalizer  = obj->u.reducer.finalizer.object;
				char *state;

				XASSERT(obj->name);
				XASSERT(transition);
				XASSERT(transition->name);

				/*
				 * If the reducer depends on an object that hasn't been created
				 * then return false, it will be resolved during a second pass
				 */
				if ((transition && !transition->created) ||
					(combiner   && !combiner->created)   ||
					(finalizer  && !finalizer->created))
				{
					if (global_print_flag && global_debug_flag)
						printf("-- deferring REDUCE %s\n", obj->name);
					break;
				}
				if (global_print_flag || global_debug_flag)
					printf("-- REDUCE %s\n", obj->name);

				/* Now, set things up to create the thing */
				mapred_setup_columns(conn, obj);
				plist = transition->u.function.parameters;
				XASSERT(plist);          /* state */
				XASSERT(plist->next);    /* parameters */

				if (obj->u.reducer.ordering)
					bufcat(&buffer, "CREATE ORDERED AGGREGATE ");
				else
					bufcat(&buffer, "CREATE AGGREGATE ");
				bufcat(&buffer, doc->prefix);
				bufcat(&buffer, obj->name);
				bufcat(&buffer, " (");

				/*
				 * Get the state type, and write out the aggregate parameters
				 * based on the parameter list of the transition function.
				 */
				plist = transition->u.function.parameters;
				state = plist->type;
				for (plist = plist->next; plist; plist = plist->next)
				{
					bufcat(&buffer, plist->type);
					if (plist->next)
						bufcat(&buffer, ", ");
				}
				bufcat(&buffer, ") (\n");
				bufcat(&buffer, "  stype = ");
				bufcat(&buffer, state);
				if (obj->u.reducer.initialize)
				{
					bufcat(&buffer, ",\n  initcond = '");
					bufcat(&buffer, obj->u.reducer.initialize);
					bufcat(&buffer, "'");
				}
				bufcat(&buffer, ",\n  sfunc = ");
				if (!transition->internal)
					bufcat(&buffer, doc->prefix);
				bufcat(&buffer, transition->name);
				if (combiner)
				{
					bufcat(&buffer, ",\n  combinefunc = ");
					if (!combiner->internal)
						bufcat(&buffer, doc->prefix);
					bufcat(&buffer, combiner->name);
				}

				/*
				 * To handle set returning finalizers the finalizer is pushed
				 * into the task definition rather than being placed in the
				 * uda where it belongs.
				 */
				/*
				 if (obj->u.reducer.finalizer.name)
				 {
				    bufcat(&buffer, ",\n  finalfunc = ");
				    bufcat(&buffer, obj->u.reducer.finalizer.name);
				 }
				*/

				bufcat(&buffer, "\n);\n\n");
				break;
			}

			case MAPRED_TASK:
			case MAPRED_EXECUTION:
			{
				mapred_object_t *input    = obj->u.task.input.object;
				mapred_object_t *mapper   = obj->u.task.mapper.object;
				mapred_object_t *reducer  = obj->u.task.reducer.object;
				mapred_plist_t  *columns  = NULL;
				mapred_plist_t  *ingrouping = NULL;
				mapred_plist_t  *newitem  = NULL;
				mapred_plist_t  *grouping = NULL;
				mapred_plist_t  *last     = NULL;
				mapred_plist_t  *scan     = NULL;
				buffer_t        *swap;

				if (!obj->u.task.execute)
					XASSERT(obj->name);
				XASSERT(obj->u.task.input.name);

				if (!qbuffer)
					qbuffer = makebuffer(1024, 1024);
				else
					bufreset(qbuffer);

				/*
				 * If the task depends on an object that hasn't been created then
				 * return false, it will be resolved during a second pass
				 */
				if ((input   && !input->created)  ||
					(mapper  && !mapper->created) ||
					(reducer && !reducer->created))
				{
					if (global_print_flag && global_debug_flag)
					{
						if (obj->u.task.execute)
							printf("-- deferring EXECUTION\n");
						else
							printf("-- deferring TASK %s\n", obj->name);
					}
					break;
				}

				if (global_print_flag || global_debug_flag)
				{
					if (obj->u.task.execute)
						printf("-- EXECUTION\n");
					else
						printf("-- TASK %s\n", obj->name);
				}

				/*
				 * 1) Handle the INPUT, two cases:
				 *   1a) There is no MAP/REDUCE:  "SELECT * FROM <input>"
				 *   1b) There is a MAP and/or REDUCE:  "<input>"
				 */
				mapred_setup_columns(conn, obj);
				if (!obj->u.task.mapper.name && !obj->u.task.reducer.name)
				{
					/* Allocate the buffer for the input. */
					if (input->u.input.type == MAPRED_INPUT_TABLE)
					{
						bufcat(&qbuffer, "SELECT * FROM ");
						bufcat(&qbuffer, input->u.input.desc);
					}
					else
					{
						bufcat(&qbuffer, "SELECT * FROM ");
						bufcat(&qbuffer, input->name);
					}
				}
				else
				{
					/* Input is just the name or description of the input */
					if (input->u.input.type == MAPRED_INPUT_TABLE)
						bufcat(&qbuffer, input->u.input.desc);
					else
						bufcat(&qbuffer, input->name);
				}

				/*
				 * How we get the columns depends a bit on the input.
				 * Is the input actually an "MAPRED_INPUT" object, or is it
				 * a "MAPRED_TASK" object?
				 */
				switch (input->kind)
				{
					case MAPRED_INPUT:
						columns = input->u.input.columns;
						break;

					case MAPRED_TASK:
						columns    = input->u.task.returns;
						ingrouping = input->u.task.grouping;

						if (!columns)
						{
							mapred_obj_error(obj, "Unable to determine return "
											 "columns for TASK '%s'",
											 obj->u.task.input.name);
							XRAISE(MAPRED_PARSE_INTERNAL, NULL);
						}
						break;

					default:
						mapred_obj_error(obj, "SOURCE '%s' is not an INPUT or "
										 "TASK object",
										 obj->u.task.input.name);
						XRAISE(MAPRED_PARSE_ERROR, "Object creation Error");
						break;
				}
				XASSERT(columns);

				/*
				 * 2) Handle the MAPPER, two cases
				 *  2a) The Mapper returns an generated ADT that needs extraction
				 *           "SELECT key(m), ...
				 *            FROM (SELECT <map(...) as m FROM <input>) mapsubq
				 *  2b) The Mapper returns a single column:
				 *           "SELECT <map>(...) FROM <input>"
				 */
				XASSERT(mapper || !obj->u.task.mapper.name);
				if (mapper)
				{
					plist = mapper->u.function.returns;
					plist2 = mapper->u.function.internal_returns;
					XASSERT(plist);

					if (plist->next)
					{ /* 2a */
						bufcat(&buffer, "SELECT ");
						for (; plist; plist = plist->next )
						{
							if( obj->internal )
							{
								XASSERT(plist2 != NULL);
								bufcat(&buffer, plist2->name);
								plist2 = plist2->next;
							}
							else
							{
								bufcat(&buffer, plist->name);
							}
							bufcat(&buffer, "(m)");
							bufcat(&buffer, " as ");
							bufcat(&buffer, plist->name);
							if (plist->next)
								bufcat(&buffer, ", ");
						}
						bufcat(&buffer, "\nFROM (");
					}

					/* shared code */
					bufcat(&buffer, "SELECT ");
					if (!mapper->internal)
						bufcat(&buffer, doc->prefix);
					bufcat(&buffer, mapper->name);
					bufcat(&buffer, "(");
					plist = mapper->u.function.parameters;
					for (; plist; plist = plist->next)
					{
						/* Check if this parameter is one of the input columns */
						for (scan = columns; scan; scan = scan->next)
							if (!strcasecmp(plist->name, scan->name))
							{
								bufcat(&buffer, plist->name);
								break;
							}

						/* Task inputs also need to scan the grouping columns */
						if (!scan)
							for (scan = ingrouping; scan; scan = scan->next)
								if (!strcasecmp(plist->name, scan->name))
								{
									bufcat(&buffer, plist->name);
									break;
								}

						/* Check if this parameter is in global_plist */
						if (!scan)
							for (scan = global_plist; scan; scan = scan->next)
								if (!strcasecmp(plist->name, scan->name))
								{
									/*
									 * (HACK)
									 * Note that global_plist overloads the
									 * plist structure using the "type" field
									 * to store "value".
									 * (HACK)
									 */
									bufcat(&buffer, "'");
									bufcat(&buffer, scan->type);
									bufcat(&buffer, "'::");
									bufcat(&buffer, plist->type);
									break;
								}


						/*
						 * If we couldn't find it issue a warning and
						 * set to NULL
						 */
						if (!scan)
						{
							if (global_verbose_flag)
								fprintf(stderr, "       ");
							fprintf(stderr,
									"WARNING: unset parameter - "
									"%s(%s => NULL)\n",
									mapper->name, plist->name);
							bufcat(&buffer, "NULL::");
							bufcat(&buffer, plist->type);
						}

						/* Add a comma if there is another parameter */
						if (plist->next)
							bufcat(&buffer, ", ");
					}
					bufcat(&buffer, ") as ");

					/* break into cases again */
					plist = mapper->u.function.returns;
					if (plist->next)
					{  /* 2a */
						bufcat(&buffer, "m FROM ");
						bufcat(&buffer, qbuffer->buffer);
						bufcat(&buffer, ") mapxq\n");
						/*
						 * Need to work this through, it seems that m is true
						 * whenever a column is null, which is not the desired
						 * behavior.
						 *
						 * Look more closely at "grep" code for why we want it,
						 * and "oreilly" code for why we don't.
						 *
						 * For the moment the compromise is that we do it only
						 * for SINGLE mode functions, since MULTI mode can
						 * control it's own filtering without this.
						 */
						if (mapper->u.function.mode != MAPRED_MODE_MULTI)
							bufcat(&buffer, "WHERE m is not null");
					}
					else
					{
						bufcat(&buffer, plist->name);
						bufcat(&buffer, " FROM ");
						bufcat(&buffer, qbuffer->buffer);
					}

					/*
					 * Swap the buffer into the qbuffer for input as the next
					 * stage of the query pipeline.
					 */
					swap = qbuffer;
					qbuffer = buffer;
					buffer = swap;
					bufreset(buffer);

					/* Columns are now the output of the mapper */
					columns  = mapper->u.function.returns;
					ingrouping = NULL;
				}

				/*
				 * 3) Handle the Reducer, several sub-cases:
				 */
				if (obj->u.task.reducer.name)
				{
					/*
					 * Step 1:   Determine grouping columns
					 *    Find which columns are returned from the previous
					 *    stage that are NOT parameters to the reducer.
					 */
					grouping = last = NULL;
					if (!reducer)
					{
						/*
						 * We have a reducer, but it isn't listed in the YAML.
						 * How to work out parameter handling still needs to
						 * be worked out.  For now we just assume that this
						 * sort of function always takes a single "value" column
						 * and returns a "value" column.
						 */
						for (plist = columns; plist; plist = plist->next)
						{
							if (strcasecmp(plist->name, "value"))
							{
								if (grouping)
								{
									last->next =
										mapred_malloc(sizeof(mapred_plist_t));
									last = last->next;
								}
								else
								{
									grouping =
										mapred_malloc(sizeof(mapred_plist_t));
									last = grouping;
								}
								last->name = plist->name;
								last->type = plist->type;
								last->next = NULL;
							}
						}
					}
					else
					{   /* The reducer exists in the YAML */

						/* We precalculated the grouping columns */
						grouping = obj->u.task.grouping;
					}

					/* Fill in the buffer */
					bufcat(&buffer, "SELECT ");
					for (plist = grouping; plist; plist = plist->next)
					{
						bufcat(&buffer, plist->name);
						bufcat(&buffer, ", ");
					}

					/* Call the aggregation function */
					if (reducer && !reducer->internal)
						bufcat(&buffer, doc->prefix);
					bufcat(&buffer, obj->u.task.reducer.name);
					bufcat(&buffer, "(");

					if (reducer)
					{
						plist = reducer->u.reducer.parameters;
						for (; plist; plist = plist->next)
						{
							/* Check if parameter is one of the input columns */
							for (scan = columns; scan; scan = scan->next)
								if (!strcasecmp(plist->name, scan->name))
								{
									bufcat(&buffer, plist->name);
									break;
								}

							/* Task inputs need to scan the grouping columns */
							if (!scan)
								for (scan = ingrouping; scan; scan = scan->next)
									if (!strcasecmp(plist->name, scan->name))
									{
										bufcat(&buffer, plist->name);
										break;
									}

							/* Check if this parameter is in global_plist */
							if (!scan)
							{
								for (scan = global_plist;
									 scan;
									 scan = scan->next)
								{
									if (!strcasecmp(plist->name, scan->name))
									{
										/*
										 * (HACK)
										 * Note that global_plist overloads the
										 * plist structure using the "type"
										 * field to store "value".
										 * (HACK)
										 */
										bufcat(&buffer, "'");
										bufcat(&buffer, scan->type);
										bufcat(&buffer, "'::");
										bufcat(&buffer, plist->type);
										break;
									}
								}
							}

							/*
							 * If we couldn't find it issue a warning
							 * and set to NULL
							 */
							if (!scan)
							{
								if (global_verbose_flag)
									fprintf(stderr, "       ");
								fprintf(stderr,
										"WARNING: unset parameter - "
										"%s(%s => NULL)\n",
										reducer->name, plist->name);
								bufcat(&buffer, "NULL::");
								bufcat(&buffer, plist->type);
							}
							if (plist->next)
								bufcat(&buffer, ", ");
						}

						/* Handle ORDERING, if specified */
						clist = reducer->u.reducer.ordering;
						if (clist)
							bufcat(&buffer, " ORDER BY ");
						for(; clist; clist = clist->next)
						{
							bufcat(&buffer, clist->value);
							if (clist->next)
								bufcat(&buffer, ", ");
						}

					}
					else
					{
						/*
						 * non-yaml reducer always takes "value" as the
						 * input column
						 */

						/* Check if "value" is one of the input columns */
						for (scan = columns; scan; scan = scan->next)
							if (!strcasecmp(scan->name, "value"))
							{
								bufcat(&buffer, "value");
								break;
							}

						/* Task inputs also need to scan the grouping columns */
						if (!scan)
							for (scan = ingrouping; scan; scan = scan->next)
								if (!strcasecmp(plist->name, scan->name))
								{
									bufcat(&buffer, "value");
									break;
								}

						/* Check if this parameter is in global_plist */
						if (!scan)
							for (scan = global_plist; scan; scan = scan->next)
								if (!strcasecmp(scan->name, "value"))
								{
									/*
									 * (HACK)
									 * Note that global_plist overloads the
									 * plist structure using the "type" field
									 * to store "value".
									 * (HACK)
									 */
									bufcat(&buffer, "'");
									bufcat(&buffer, scan->type);
									bufcat(&buffer, "'::");
									bufcat(&buffer, plist->type);
									break;
								}

						if (!scan)
						{
							if (global_verbose_flag)
								fprintf(stderr, "       ");
							fprintf(stderr,
									"WARNING: unset parameter - "
									"%s(value => NULL)\n",
									obj->u.task.reducer.name);
							bufcat(&buffer, "NULL");
						}
					}

					bufcat(&buffer, ") as ");

					if (reducer)
					{
						plist = reducer->u.reducer.returns;
						XASSERT(plist);  /* Need to have a return! */

						/*
						 * If the reducer has a finalizer we push it outside of
						 * the context of the UDA so that we can properly handle
						 * set returning/column returning functions.
						 */
						if (reducer->u.reducer.finalizer.name)
							bufcat(&buffer, "r");
						else
							bufcat(&buffer, plist->name);
					}
					else
					{
						/*
						 * non-yaml reducer always return a single column
						 * named "value"
						 */
						bufcat(&buffer, "value");
					}
					bufcat(&buffer, "\nFROM ");
					if (mapper)
					{
						bufcat(&buffer, "(");
						bufcat(&buffer, qbuffer->buffer);
						bufcat(&buffer, ") mapsubq");
					}
					else
					{
						bufcat(&buffer, qbuffer->buffer);
					}
					if (grouping)
					{
						bufcat(&buffer, "\nGROUP BY ");
						for (plist = grouping; plist; plist = plist->next)
						{
							bufcat(&buffer, plist->name);
							if (plist->next)
								bufcat(&buffer, ", ");
						}
					}

					/*
					 * Swap the buffer into the qbuffer for input as the next
					 * stage of the query pipeline.
					 */
					swap = qbuffer;
					qbuffer = buffer;
					buffer = swap;
					bufreset(buffer);

					/*
					 * Add the return columns to the grouping columns and set
					 * it to the current columns.
					 *
					 * Note that unlike the columns set by the mapper or the
					 * input this is a list that must be de-allocated.
					 */
					columns = last;

					/*
					 * If the reducer had a finalizer we push it into another
					 * nested subquery since user defined aggregates aren't
					 * allowed to return sets.
					 *
					 * NOTE: this code mostly duplicates the MAP code above
					 */
					if (reducer && reducer->u.reducer.finalizer.name)
					{
						mapred_object_t *finalizer;

						finalizer = reducer->u.reducer.finalizer.object;
						XASSERT(finalizer);  /* FIXME */
						XASSERT(finalizer->u.function.returns);

						/*
						 * If the finalizer returns multiple columns then we
						 * need an extra layer of wrapping to extract them.
						 */
						plist = finalizer->u.function.returns;
						if (plist->next)
						{
							bufcat(&buffer, "SELECT ");

							/* the grouping columns */
							for (plist = grouping;
								 plist;
								 plist = plist->next)
							{
								bufcat(&buffer, plist->name);
								bufcat(&buffer, ", ");
							}
							plist2 = finalizer->u.function.internal_returns;
							for (plist = finalizer->u.function.returns;
								 plist;
								 plist = plist->next)
							{
								if( finalizer->internal )
								{
									XASSERT( plist2 != NULL );
									bufcat(&buffer, plist2->name);
									plist2 = plist2->next;
								}
								else
								{
									bufcat(&buffer, plist->name);
								}

								bufcat(&buffer, "(r)");
								bufcat(&buffer, " as ");
								bufcat(&buffer, plist->name);
								if (plist->next)
									bufcat(&buffer, ", ");
							}
							bufcat(&buffer, "\nFROM (");
						}

						/*
						 * Call the function on the returned state from
						 * the reducer.
						 */
						bufcat(&buffer, "SELECT ");

						/* grouping columns */
						for (plist = grouping;
							 plist;
							 plist = plist->next)
						{
							bufcat(&buffer, plist->name);
							bufcat(&buffer, ", ");
						}

						if (!finalizer->internal)
							bufcat(&buffer, doc->prefix);
						bufcat(&buffer, finalizer->name);
						bufcat(&buffer, "(r) as ");

						/* break into cases again */
						plist = finalizer->u.function.returns;
						if (plist->next)
							bufcat(&buffer, "r");
						else
							bufcat(&buffer, plist->name);
						bufcat(&buffer, " FROM (");
						bufcat(&buffer, qbuffer->buffer);
						bufcat(&buffer, ") redxq\n");

						/*
						 * If we have that extra layer of wrapping
						 * then close it off
						 */
						if (finalizer->u.function.returns->next)
							bufcat(&buffer, ") redsubq\n");

						/*
						 * Swap the buffer into the qbuffer for input as the next
						 * stage of the query pipeline.
						 */
						swap = qbuffer;
						qbuffer = buffer;
						buffer = swap;
						bufreset(buffer);
					}
				}


				/*
				 * 4) Handle the final transform into the view definition:
				 *        "CREATE TEMPORARY VIEW . AS .;"
				 */
				bufcat(&buffer, "CREATE TEMPORARY VIEW ");
				bufcat(&buffer, obj->name);
				bufcat(&buffer, " AS\n");
				bufcat(&buffer, qbuffer->buffer);
				bufcat(&buffer, ";\n\n");

				/*
				 * If there was a reducer then we have to release the columns
				 * list, otherwise it is a pointer to an existing list and can
				 * be ignored.
				 */
				if (obj->u.task.reducer.name)
				{
					plist = columns;
					while (plist && plist != grouping)
					{
						newitem = plist;
						plist = plist->next;
						mapred_free(newitem);
					}
				}
				break;
			}

			case MAPRED_ADT:
				XASSERT(obj->name);
				mapred_setup_columns(conn, obj);

				/*
				 * ADT's have generated names that already include the
				 * document prefix
				 */
				bufcat(&buffer, "CREATE TYPE ");
				bufcat(&buffer, obj->name);
				bufcat(&buffer, " as (");
				for (plist = obj->u.adt.returns; plist; plist = plist->next)
				{
					bufcat(&buffer, plist->name);
					bufcat(&buffer, " ");
					bufcat(&buffer, plist->type);
					if (plist->next)
						bufcat(&buffer, ", ");
				}
				bufcat(&buffer, ");\n\n");
				break;

			default:
				XASSERT(false);
		}

		if (buffer->position > 0)
		{

			/*
			 * In print-only mode we do everything but run the queries
			 * ie, we still create and destroy objects.
			 */
			if (global_print_flag || global_debug_flag)
				printf("%s", buffer->buffer);

			/*
			 * Try to create the object, but failure should not terminate
			 * the transaction, so wrap it in a savepoint.
			 */
			PQexec(conn, "SAVEPOINT mapreduce_save");
			result = PQexec(conn, buffer->buffer);
			if (PQresultStatus(result) == PGRES_COMMAND_OK)
			{
				obj->created = true;
				PQexec(conn, "RELEASE SAVEPOINT mapreduce_save");
			}
			else
			{
				char *error = PQresultErrorField(result, PG_DIAG_SQLSTATE);

				/* rollback to savepoint */
				PQexec(conn, "ROLLBACK TO SAVEPOINT mapreduce_save");
				PQexec(conn, "RELEASE SAVEPOINT mapreduce_save");

				/*
				 * If we have an "object does not exist" error from a SQL input
				 * then it may just be a dependency issue, so we don't error
				 * right away.
				 */
				if (obj->kind != MAPRED_INPUT ||
					obj->u.input.type != MAPRED_INPUT_QUERY ||
					strcmp(error, OBJ_DOES_NOT_EXIST))
				{
					if (global_verbose_flag)
						fprintf(stderr, "     - ");
					fprintf(stderr, "%s", PQresultErrorMessage(result));
					XRAISE(MAPRED_SQL_ERROR, "Object creation Failure");
				}
				if (global_verbose_flag)
					fprintf(stderr, "       Error: %s\n",
							PQresultErrorField(result, PG_DIAG_MESSAGE_PRIMARY));

				/*
				 * If it is an error that we think we can recover from then we don't
				 * log the error immediately, but write it to a buffer in the event
				 * that recovery wasn't successful.
				 */
				if (doc->errors)
				{
					if (global_verbose_flag)
						bufcat(&doc->errors, "  - ");
					bufcat(&doc->errors, "Error: ");
					bufcat(&doc->errors, (char*) mapred_kind_name[obj->kind]);
					if (obj->name)
					{
						bufcat(&doc->errors, " '");
						bufcat(&doc->errors, obj->name);
						bufcat(&doc->errors, "'");
					}
					bufcat(&doc->errors, ": ");
					bufcat(&doc->errors, PQresultErrorField(result, PG_DIAG_MESSAGE_PRIMARY));
					if (obj->line > 0)
					{
						char numbuf[64];
						sprintf(numbuf, ", at line %d", obj->line);
						bufcat(&doc->errors, numbuf);
					}
					bufcat(&doc->errors, "\n");
				}
			}
		}

		/*
		 * INPUTS setup columns AFTER creation.
		 * All other objects handle it above prior to creation.
		 */
		if (obj->kind == MAPRED_INPUT && obj->created)
			mapred_setup_columns(conn, obj);
	}
	XFINALLY
	{
		if (buffer)
			mapred_free(buffer);
		if (qbuffer)
			mapred_free(qbuffer);
	}
	XTRY_END;

	return obj->created;
}


void mapred_remove_object(PGconn *conn, mapred_document_t *doc, mapred_object_t *obj)
{
	mapred_plist_t *plist    = NULL;
	const char     *ckind    = NULL;
	buffer_t       *buffer   = NULL;


	/* If the object wasn't created, then do nothing */
	if (!obj->created)
		return;

	if (obj->internal)
	{
		obj->created = false;
		return;
	}

	XTRY
	{
		buffer = makebuffer(100, 100);

		/* Otherwise attempt to create the object */
		switch (obj->kind)
		{
			case MAPRED_INPUT:
				XASSERT(obj->name);

				switch (obj->u.input.type)
				{
					case MAPRED_INPUT_TABLE:
						obj->created = false;
						break;  /* do nothing */

					case MAPRED_INPUT_FILE:
					case MAPRED_INPUT_GPFDIST:
					case MAPRED_INPUT_EXEC:
						bufcat(&buffer, "DROP EXTERNAL TABLE IF EXISTS ");
						bufcat(&buffer, doc->prefix);
						bufcat(&buffer, obj->name);
						bufcat(&buffer, " CASCADE;\n");
						break;

					case MAPRED_INPUT_QUERY:
						bufcat(&buffer, "DROP VIEW IF EXISTS ");
						bufcat(&buffer, obj->name);
						bufcat(&buffer, " CASCADE;\n");
						break;

					case MAPRED_INPUT_NONE:
					default:
						XASSERT(false);
				}
				break;

			case MAPRED_OUTPUT:
				/* nothing to do for outputs */
				obj->created = false;
				break;

				/*
				 * The function types have different defaults and generate
				 * slightly different error messages, but basically do the
				 * same thing.
				 */
			case MAPRED_MAPPER:
			case MAPRED_TRANSITION:
			case MAPRED_COMBINER:
			case MAPRED_FINALIZER:
				ckind = mapred_kind_name[obj->kind];

				bufcat(&buffer, "DROP FUNCTION IF EXISTS ");
				if (!obj->internal)
					bufcat(&buffer, doc->prefix);
				bufcat(&buffer, obj->name);
				bufcat(&buffer, "(");
				for (plist = obj->u.function.parameters;
					 plist;
					 plist = plist->next)
				{  /* Handle parameter list */
					bufcat(&buffer, plist->type);
					if (plist->next)
						bufcat(&buffer, ", ");
				}
				bufcat(&buffer, ") CASCADE;\n");
				break;


			case MAPRED_REDUCER:
			{
				mapred_object_t  *transition = obj->u.reducer.transition.object;

				XASSERT(obj->name);
				XASSERT(transition);   /* is this a good assumption? */

				if (!transition->u.function.parameters)
					break;  /* FIXME */
				XASSERT(transition->u.function.parameters);

				bufcat(&buffer, "DROP AGGREGATE IF EXISTS ");
				if (!obj->internal)
					bufcat(&buffer, doc->prefix);
				bufcat(&buffer, obj->name);
				bufcat(&buffer, "(");

				/*
				 * The first parameter of the transition function is the 'state'
				 * and is not listed as a parameter of the reducer, but all the
				 * rest of the parameters are
				 */
				plist = transition->u.function.parameters;
				for (plist = plist->next; plist; plist = plist->next)
				{
					bufcat(&buffer, plist->type);
					if (plist->next)
						bufcat(&buffer, ", ");
				}
				bufcat(&buffer, ") CASCADE;\n");
				break;
			}

			case MAPRED_TASK:
			case MAPRED_EXECUTION:
				XASSERT(obj->name);

				bufcat(&buffer, "DROP VIEW IF EXISTS ");
				bufcat(&buffer, obj->name);
				bufcat(&buffer, " CASCADE;\n");
				break;

			case MAPRED_ADT:
				bufcat(&buffer, "DROP TYPE IF EXISTS ");
				bufcat(&buffer, obj->name);
				bufcat(&buffer, " CASCADE;\n");
				break;

			default:
				XASSERT(false);
		}

		if (buffer->position > 0)
		{
			PGresult   *result;

			/*
			 * In print-only mode we do everything but run the queries
			 * ie, we still create and destroy objects.
			 */
			if (global_print_flag || global_debug_flag)
				printf("%s", buffer->buffer);

			/* Try to delete the object, but don't raise exception on error */
			result = PQexec(conn, buffer->buffer);
			if (PQresultStatus(result) == PGRES_COMMAND_OK)
				obj->created = false;
			else
			{
				char *error = PQresultErrorField(result, PG_DIAG_SQLSTATE);

				/*
				 * Errors that we can expect/ignore:
				 *
				 *     IN_FAILED_SQL_TRANSACTION -
				 *        another error has occured and the transaction was
				 *        aborted
				 *
				 */
				if (strcmp(error, IN_FAILED_SQL_TRANSACTION))
				{
					if (global_verbose_flag)
						fprintf(stderr, "    - ");
					if (obj->name)
						fprintf(stderr,
								"[WARNING] Error dropping '%s'\n", obj->name);
					else
						fprintf(stderr,
								"[WARNING] Error dropping unnamed object\n");
					if (global_verbose_flag)
						fprintf(stderr, "    - ");
					fprintf(stderr, "%s", PQerrorMessage(conn));
				}
			}
			PQclear(result);
		}
	}
	XFINALLY
	{
		if (buffer)
			mapred_free(buffer);
	}
	XTRY_END;
}

相关信息

greenplumn 源码目录

相关文章

greenplumn main 源码

greenplumn parse 源码

greenplumn yaml_private 源码

0  赞