greenplumn parse 源码

  • 2022-08-18
  • 浏览 (407)

greenplumn parse 代码

文件路径:/gpcontrib/gpmapreduce/src/parse.c

#include <parser.h>
#include <except.h>
#include <mapred_errors.h>

#include <stdio.h>
#include <yaml_parse.h>
#include <yaml.h>

#include <stdarg.h>

int mapred_parse_error(mapred_parser_t *parser, char *fmt, ...)
	__attribute__((format(PG_PRINTF_ATTRIBUTE, 2, 3)));
int mapred_verify_object(mapred_parser_t *parser, mapred_object_t *obj);

/* -------------------------------------------------------------------------- */
int mapred_parse_error(mapred_parser_t *parser, char *fmt, ...)
{
	mapred_object_t *obj = parser->current_obj;
	va_list arg;

	if (parser && parser->current_doc)
	{
		if (global_verbose_flag)
			fprintf(stderr, "    - ");
		parser->current_doc->u.document.flags |= mapred_document_error;
	}
	else if (global_verbose_flag)
		fprintf(stderr, "  - ");

	fprintf(stderr, "Error: ");
	if (obj && obj->name)
		fprintf(stderr, "%s '%s': ", mapred_kind_name[obj->kind], obj->name);
	if (obj && !obj->name)
		fprintf(stderr, "%s: ", mapred_kind_name[obj->kind]);

	va_start(arg, fmt);
	vfprintf(stderr, fmt, arg);
	va_end(arg);
	if (parser && parser->event.start_mark.line)
		fprintf(stderr, ", at line %d", (int) parser->event.start_mark.line+1);
	fprintf(stderr, "\n");
	
	return MAPRED_PARSE_ERROR;
}


#define copyscalar(s)							\
	strcpy(malloc(strlen(s)+1), s)

mapred_olist_t* mapred_parse_string(unsigned char *yaml)
{
	mapred_olist_t  *documents;
	yaml_parser_t    parser;
	
	XASSERT(yaml);
	if (!yaml_parser_initialize(&parser))
		XRAISE(MAPRED_PARSE_INTERNAL,
			   "YAML parser initialization failed");

	yaml_parser_set_input_string(&parser, yaml, strlen((char*) yaml));
	documents = mapred_parse_yaml(&parser);
	yaml_parser_delete(&parser);
	return documents;
}

mapred_olist_t* mapred_parse_file(FILE *file)
{
	mapred_olist_t  *documents;
	yaml_parser_t parser;

	XASSERT(file);
	if (!yaml_parser_initialize(&parser))
		XRAISE(MAPRED_PARSE_INTERNAL,
			   "YAML parser initialization failed");

	yaml_parser_set_input_file(&parser, file);
	documents = mapred_parse_yaml(&parser);
	yaml_parser_delete(&parser);
	return documents;
}

mapred_olist_t* mapred_parse_yaml(yaml_parser_t *yparser)
{
	mapred_parser_t		 parser;
	int					 i;
	int					 error = 0;
	mapred_olist_t		*doc_item;

	/* Give us a clean slate */
	memset(&parser, 0, sizeof(parser));

	/* Initialize what must be initialized */
#if USE_FLEX_REENTRANT
	yaml_scalar_yylex_init (&parser.yscanner);
#endif
	parser.yparser = yparser;
	parser.state   = STATE_YAML_PARSE;
	parser.frame   = -1;
	for (i = 0; i < MAX_CONTEXT_DEPTH; i++)
		parser.context[i] = CONTEXT_NONE;

	/* Call into the parser, detects grammar errors */
	error = yaml_yyparse(&parser);

	/* finalize final document */
	parser_add_document(&parser);

	/* Cleanup and return */
#if USE_FLEX_REENTRANT
	yaml_scalar_yylex_destroy(parser.yscanner);
#endif

	/* Check for errors within documents */
	for (doc_item = parser.doclist;
		 doc_item && !error;
		 doc_item = doc_item->next)
	{
		if (doc_item->object->u.document.flags & mapred_document_error)
			error = true;
	}

	/* Cleanup and return */	
	if (error)
	{
		mapred_destroy_olist(&parser.doclist);
		XRAISE(MAPRED_PARSE_ERROR, "parse failure");
	}
	
	return parser.doclist;
}


void parser_add_document(mapred_parser_t *parser)
{
	mapred_olist_t     *newitem;
	mapred_olist_t     *doclist;
	int                 error;

	if (!parser->current_doc)
		return;

	/* Add the last of the documents objects into the document */
	parser_add_object(parser, MAPRED_NO_KIND);

	/* Verify the completed document */
	error = mapred_verify_object(parser, parser->current_doc);
	if (error != NO_ERROR)
		parser->current_doc->u.document.flags |= mapred_document_error;

	/* Allocate the new list item */
	newitem = malloc(sizeof(mapred_olist_t));
	newitem->object = parser->current_doc;
	newitem->next   = (mapred_olist_t *) NULL;

	/* Insert it into the last slot of the existing list */
	doclist = parser->doclist;
	while (doclist && doclist->next)
		doclist = doclist->next;
	if (doclist)
		doclist->next = newitem;
	else
		parser->doclist = newitem;
}

void parser_begin_document(mapred_parser_t *parser)
{
	/* If there is a current document add it first */
	parser_add_document(parser);

	/* Allocate an object for the new document and return */
	parser->current_doc = malloc(sizeof(mapred_object_t));
	memset(parser->current_doc, 0, sizeof(mapred_object_t));
	parser->current_doc->kind = MAPRED_DOCUMENT;
	parser->current_doc->u.document.id = ++parser->doc_number;
	parser->current_doc->line = (int) parser->event.start_mark.line+1;

	if (global_verbose_flag)
		fprintf(stderr, "  - Parsing YAML Document %d:\n", parser->doc_number);
}

void parser_begin_define(mapred_parser_t *parser)
{
	XASSERT(parser->current_doc);

	/*
	 * The only thing we have to do is ensure that this isn't a duplicate
	 * define list.
	 */
	if (parser->current_doc->u.document.flags & mapred_document_defines)
	{
		mapred_parse_error(parser, "Duplicate DEFINE list in DOCUMENT");
		return;
	}

	parser->current_doc->u.document.flags |= mapred_document_defines;
}

void parser_begin_execute(mapred_parser_t *parser)
{
	XASSERT(parser->current_doc);

	/*
	 * The only thing we have to do is ensure that this isn't a duplicate
	 * execution list.
	 */
	if (parser->current_doc->u.document.flags & mapred_document_executes)
	{
		mapred_parse_error(parser, "Duplicate EXECUTE list in DOCUMENT");
		return;
	}

	parser->current_doc->u.document.flags |= mapred_document_executes;
}

void parser_set_version(mapred_parser_t *parser, char *value)
{
	XASSERT(parser->current_doc);

	if (parser->current_doc->u.document.version)
	{
		mapred_parse_error(parser, "Duplicate Version: %s", value);
		return;
	}

	/*
	 * We have already assured that the value matches a good regex,
	 * but we must still validate that the version itself is supported.
	 */
	if (strcmp(value, "1.0.0.1") < 0 || strcmp(value, "1.0.0.3") > 0)
	{
		mapred_parse_error(parser, "Unrecognized VERSION");
	}

	parser->current_doc->u.document.version = copyscalar(value);
}

void parser_set_database(mapred_parser_t *parser, char *value)
{
	XASSERT(parser->current_doc);
	if (parser->current_doc->u.document.database)
	{
		mapred_parse_error(parser, "Duplicate Database: %s", value);
		return;
	}
	parser->current_doc->u.document.database = copyscalar(value);
}

void parser_set_user(mapred_parser_t *parser, char *value)
{
	XASSERT(parser->current_doc);
	if (parser->current_doc->u.document.user)
	{
		mapred_parse_error(parser, "Duplicate User: %s", value);
		return;
	}
	parser->current_doc->u.document.user = copyscalar(value);
}

void parser_set_host(mapred_parser_t *parser, char *value)
{
	XASSERT(parser->current_doc);
	if (parser->current_doc->u.document.host)
	{
		mapred_parse_error(parser, "Duplicate Host: %s", value);
		return;
	}
	parser->current_doc->u.document.host = copyscalar(value);
}

void parser_set_port(mapred_parser_t *parser, char *value)
{
	XASSERT(parser->current_doc);
	if (parser->current_doc->u.document.port > 0)
	{
		mapred_parse_error(parser, "Duplicate Port: %s", value);
		return;
	}

	/*
	 * The parse has already assured that the value consists of a sequence
	 * of digits, so strtol should convert successfully.
	 */
	parser->current_doc->u.document.port = (int) strtol(value, NULL, 10);
}


/*
 * parser_add_object - Create a new empty object for the current document.
 */
void parser_add_object(mapred_parser_t *parser, mapred_kind_t kind)
{
	int error;

	XASSERT(parser->current_doc);

	/*
	 * If we have a current object then verify it and add it into the
	 * document's object list.
	 */
	if (parser->current_obj)
	{
		mapred_olist_t *newitem;
		mapred_olist_t *objlist;

		/* Validate the finished object */
		error = mapred_verify_object(parser, parser->current_obj);
		if (error != NO_ERROR)
		{
			mapred_destroy_object(&parser->current_obj);
			parser->current_doc->u.document.flags |=
				mapred_document_error;
		}
		else
		{

			/* Allocate the new list item */
			newitem = malloc(sizeof(mapred_olist_t));
			newitem->object = parser->current_obj;
			newitem->next   = (mapred_olist_t *) NULL;

			/* Insert it into the last slot of the existing list */
			objlist = parser->current_doc->u.document.objects;
			while (objlist && objlist->next)
				objlist = objlist->next;
			if (objlist)
				objlist->next = newitem;
			else
				parser->current_doc->u.document.objects = newitem;

			if (global_verbose_flag)
			{
				const char *type, *name;
				XASSERT (newitem->object->kind > 0 &&
						 newitem->object->kind <= MAPRED_MAXKIND);


				type = mapred_kind_name[newitem->object->kind];
				name = newitem->object->name;
				if (name)
					fprintf(stderr, "    - %s: %s\n", type, name);
				else
					fprintf(stderr, "    - %s\n", type);
			}
		}
	}

	/*
	 * If 'kind' is 'NO_KIND' then we just add in the current object
	 * (above) and do not create a new one.  We call it this way once
	 * at the end to add the last object into the current document.
	 */
	if (kind == MAPRED_NO_KIND)
	{
		parser->current_obj = (mapred_object_t *) NULL;
		return;
	}

	/* Allocate a new empyt object of the correct kind and return. */
	parser->current_obj = malloc(sizeof(mapred_object_t));
	memset(parser->current_obj, 0, sizeof(mapred_object_t));
	parser->current_obj->kind = kind;
	parser->current_obj->line = (int) parser->event.start_mark.line+1;
}



void parser_add_run(mapred_parser_t *parser)
{
	/*
	 * Execution objects just re-use the 'task' structure.  The only
	 * differences are that:
	 *    Execution objects get RUN
	 *    Execution objects do not require (or support) a NAME
	 */
	parser_add_object(parser, MAPRED_EXECUTION);
	parser->current_obj->u.task.execute = true;
}



void parser_set_name(mapred_parser_t *parser, char *value)
{
	XASSERT(parser->current_doc);
	XASSERT(parser->current_obj);
	XASSERT(parser->current_obj->kind == MAPRED_INPUT      ||
		   parser->current_obj->kind == MAPRED_OUTPUT     ||
		   parser->current_obj->kind == MAPRED_MAPPER     ||
		   parser->current_obj->kind == MAPRED_TRANSITION ||
		   parser->current_obj->kind == MAPRED_COMBINER   ||
		   parser->current_obj->kind == MAPRED_FINALIZER  ||
		   parser->current_obj->kind == MAPRED_REDUCER    ||
		   parser->current_obj->kind == MAPRED_TASK);

	/* If this is an invalid name => throw an error */
	if (!value || strlen(value) == 0)
	{
		value = "?";
		mapred_parse_error(parser, "Invalid NAME: %s", value);
	}

	/* If the object already has a name => throw an error */
	if (parser->current_obj->name)
	{
		mapred_parse_error(parser, "Duplicate NAME: %s", value);
		return;
	}

	parser->current_obj->name = copyscalar(value);
}

void parser_set_table(mapred_parser_t *parser, char *value)
{
	XASSERT(parser->current_doc);
	XASSERT(parser->current_obj);
	XASSERT(parser->current_obj->kind == MAPRED_INPUT ||
		   parser->current_obj->kind == MAPRED_OUTPUT);

	if (!value || strlen(value) == 0)
	{
		value = "";
		mapred_parse_error(parser, "Invalid TABLE");
	}

	if (parser->current_obj->kind == MAPRED_INPUT)
	{
		if (!value || strlen(value) == 0)
		{
			if (parser->current_obj->u.input.type == MAPRED_INPUT_NONE)
				parser->current_obj->u.input.type = MAPRED_INPUT_TABLE;
			mapred_parse_error(parser, "Invalid TABLE");
			return;
		}

		if (parser->current_obj->u.input.type != MAPRED_INPUT_NONE)
		{
			switch (parser->current_obj->u.input.type)
			{
				case MAPRED_INPUT_TABLE:
					mapred_parse_error(parser,
									   "Duplicate TABLE");
					return;
				case MAPRED_INPUT_FILE:
					mapred_parse_error(parser,
									   "FILE is incompatible with TABLE");
					return;
				case MAPRED_INPUT_GPFDIST:
					mapred_parse_error(parser,
									   "GPFDIST is incompatible with TABLE");
					return;
				case MAPRED_INPUT_QUERY:
					mapred_parse_error(parser,
									   "QUERY is incompatible with TABLE");
					return;
				case MAPRED_INPUT_EXEC:
					mapred_parse_error(parser,
									   "GPFDIST is incompatible with TABLE");
					return;
				default:
					XASSERT(false);
			}
		}
		parser->current_obj->u.input.type = MAPRED_INPUT_TABLE;
		parser->current_obj->u.input.desc = copyscalar(value);
	}
	else
	{
		if (!value || strlen(value) == 0)
		{
			if (parser->current_obj->u.output.type == MAPRED_OUTPUT_NONE)
				parser->current_obj->u.output.type = MAPRED_OUTPUT_TABLE;
			mapred_parse_error(parser, "Invalid TABLE");
			return;
		}

		if (parser->current_obj->u.output.type != MAPRED_OUTPUT_NONE)
		{
			switch (parser->current_obj->u.output.type)
			{
				case MAPRED_OUTPUT_TABLE:
					mapred_parse_error(parser,
									   "Duplicate TABLE");
					return;
				case MAPRED_OUTPUT_FILE:
					mapred_parse_error(parser,
									   "FILE is incompatible with TABLE");
					return;
				default:
					XASSERT(false);
			}
		}
		parser->current_obj->u.output.type = MAPRED_OUTPUT_TABLE;
		parser->current_obj->u.output.desc = copyscalar(value);
	}
}

void parser_set_query(mapred_parser_t *parser, char *value)
{
	XASSERT(parser->current_doc);
	XASSERT(parser->current_obj);
	XASSERT(parser->current_obj->kind == MAPRED_INPUT);

	if (!value || strlen(value) == 0)
	{
		value = "";
		mapred_parse_error(parser, "Invalid QUERY");
	}

	if (parser->current_obj->u.input.type != MAPRED_INPUT_NONE)
	{
		if (parser->current_obj->u.input.type == MAPRED_INPUT_QUERY)
		{
			mapred_parse_error(parser, "Duplicate QUERY for INPUT");
			return;
		}
		else
		{
			mapred_parse_error(parser, "INPUT may only specify one of "
							   "FILE, GPFDIST, TABLE, QUERY, EXEC");
			return;
		}
	}
	parser->current_obj->u.input.type = MAPRED_INPUT_QUERY;
	parser->current_obj->u.input.desc = copyscalar(value);
}

void parser_set_exec(mapred_parser_t *parser, char *value)
{
	XASSERT(parser->current_doc);
	XASSERT(parser->current_obj);
	XASSERT(parser->current_obj->kind == MAPRED_INPUT);

	if (!value || strlen(value) == 0)
	{
		value = "";
		mapred_parse_error(parser, "Invalid EXEC");
	}

	if (parser->current_obj->u.input.type != MAPRED_INPUT_NONE)
	{
		if (parser->current_obj->u.input.type == MAPRED_INPUT_EXEC)
		{
			mapred_parse_error(parser, "Duplicate EXEC for INPUT");
			return;
		}
		else
		{
			mapred_parse_error(parser, "INPUT may only specify one of "
							   "FILE, GPFDIST, TABLE, QUERY, EXEC");
			return;
		}
	}
	parser->current_obj->u.input.type = MAPRED_INPUT_EXEC;
	parser->current_obj->u.input.desc = copyscalar(value);
}

void parser_set_format(mapred_parser_t *parser, char *value)
{
	mapred_format_t format;
	XASSERT(parser->current_doc);
	XASSERT(parser->current_obj);

	if (value && !strcasecmp(value, "text"))
		format = MAPRED_FORMAT_TEXT;
	else if (value && !strcasecmp(value, "csv"))
		format = MAPRED_FORMAT_CSV;
	else
		format = MAPRED_FORMAT_INVALID;

	switch (parser->current_obj->kind)
	{
		case MAPRED_INPUT:
			if (format == MAPRED_FORMAT_INVALID)
				mapred_parse_error(parser, "Duplicate FORMAT");
			if (parser->current_obj->u.input.format != MAPRED_FORMAT_NONE)
			{
				format = MAPRED_FORMAT_INVALID;
				mapred_parse_error(parser, "Duplicate FORMAT");
			}
			parser->current_obj->u.input.format = format;
			return;

		case MAPRED_OUTPUT:
			if (format == MAPRED_FORMAT_INVALID)
				mapred_parse_error(parser, "Duplicate FORMAT");
			if (parser->current_obj->u.output.format != MAPRED_FORMAT_NONE)
			{
				format = MAPRED_FORMAT_INVALID;
				mapred_parse_error(parser, "Duplicate FORMAT");
			}
			parser->current_obj->u.output.format = format;
			return;

		default:
			XASSERT(false);
	}
}

void parser_set_delimiter(mapred_parser_t *parser, char *value)
{
	XASSERT(parser->current_doc);
	XASSERT(parser->current_obj);

	if (!value || strlen(value) == 0)
	{
		value = "";
		mapred_parse_error(parser, "Invalid DELIMITER");
	}

	switch (parser->current_obj->kind)
	{
		case MAPRED_INPUT:
			if (parser->current_obj->u.input.delimiter)
			{
				mapred_parse_error(parser, "Duplicate DELIMITER");
				return;
			}
			parser->current_obj->u.input.delimiter = copyscalar(value);
			return;

		case MAPRED_OUTPUT:
			if (parser->current_obj->u.output.delimiter)
			{
				mapred_parse_error(parser, "Duplicate DELIMITER");
				return;
			}
			parser->current_obj->u.output.delimiter = copyscalar(value);
			return;
			
		default:
			XASSERT(false);
	}
}

void parser_set_escape(mapred_parser_t *parser, char *value)
{
	XASSERT(parser->current_doc);
	XASSERT(parser->current_obj);
	XASSERT(parser->current_obj->kind == MAPRED_INPUT);

	if (!value || strlen(value) == 0)
	{
		value = "";
		mapred_parse_error(parser, "Invalid ESCAPE");
	}
	if (parser->current_obj->u.input.escape)
	{
		mapred_parse_error(parser, "Duplicate ESCAPE");
		return;
	}
	parser->current_obj->u.input.escape = copyscalar(value);
}


void parser_set_null(mapred_parser_t *parser, char *value)
{
	XASSERT(parser->current_doc);
	XASSERT(parser->current_obj);
	XASSERT(parser->current_obj->kind == MAPRED_INPUT);

	if (!value || strlen(value) == 0)
	{
		parser->current_obj->u.input.null = copyscalar("");
		mapred_parse_error(parser, "Invalid NULL");
		return;
	}
	if (parser->current_obj->u.input.null)
	{
		mapred_parse_error(parser, "Duplicate NULL");
		return;
	}
	parser->current_obj->u.input.null = copyscalar(value);
}

void parser_set_quote(mapred_parser_t *parser, char *value)
{
	XASSERT(parser->current_doc);
	XASSERT(parser->current_obj);
	XASSERT(parser->current_obj->kind == MAPRED_INPUT);

	if (!value || strlen(value) == 0)
	{
		parser->current_obj->u.input.quote = copyscalar("");
		mapred_parse_error(parser, "Invalid QUOTE");
		return;
	}
	if (parser->current_obj->u.input.quote)
	{
		mapred_parse_error(parser, "Duplicate QUOTE");
		return;
	}
	parser->current_obj->u.input.quote = copyscalar(value);
}


void parser_set_encoding(mapred_parser_t *parser, char *value)
{
	XASSERT(parser->current_doc);
	XASSERT(parser->current_obj);
	XASSERT(parser->current_obj->kind == MAPRED_INPUT);

	if (!value || strlen(value) == 0)
	{
		parser->current_obj->u.input.encoding = copyscalar("");
		mapred_parse_error(parser, "Invalid ENCODING");
		return;
	}
	if (parser->current_obj->u.input.encoding)
	{
		mapred_parse_error(parser, "Duplicate ENCODING");
		return;
	}
	parser->current_obj->u.input.encoding = copyscalar(value);
}

void parser_set_error_limit(mapred_parser_t *parser, char *value)
{
	XASSERT(parser->current_doc);
	XASSERT(parser->current_obj);
	XASSERT(parser->current_obj->kind == MAPRED_INPUT);

	if (!value || strlen(value) == 0)
	{
		parser->current_obj->u.input.error_limit = -1;
		mapred_parse_error(parser, "Invalid ERROR_LIMIT");
		return;
	}
	if (parser->current_obj->u.input.error_limit > 0)
	{
		mapred_parse_error(parser, "Duplicate ERROR_LIMIT");
		return;
	}

	/*
	 * The parse has already assured that the value consists of a sequence
	 * of digits, so strtol should convert successfully.
	 */
	parser->current_obj->u.input.error_limit = (int) strtol(value, NULL, 10);
}


void parser_set_mode(mapred_parser_t *parser, char *value)
{
	XASSERT(parser->current_doc);
	XASSERT(parser->current_obj);

	switch (parser->current_obj->kind)
	{
		case MAPRED_MAPPER:
		case MAPRED_TRANSITION:
		case MAPRED_COMBINER:
		case MAPRED_FINALIZER:
		{
			mapred_mode_t mode;

			/* Convert input string into a valid mode */
			if (value && !strcasecmp(value, "single"))
				mode = MAPRED_MODE_SINGLE;
			else if (value && !strcasecmp(value, "multi"))
				mode = MAPRED_MODE_MULTI;
			else
				mode = MAPRED_MODE_INVALID;

			/* Only MAP and FINALIZE support MULTI mode */
			if (mode == MAPRED_MODE_MULTI &&
				parser->current_obj->kind != MAPRED_MAPPER &&
				parser->current_obj->kind != MAPRED_FINALIZER)
			{
				mode = MAPRED_MODE_INVALID;
			}

			/* Error for invalid or duplicate modes */
			if (mode == MAPRED_MODE_INVALID)
			{
				mapred_parse_error(parser, "Invalid MODE");
			}
			if (parser->current_obj->u.function.mode != MAPRED_MODE_NONE)
			{
				mode = MAPRED_MODE_INVALID;
				mapred_parse_error(parser, "Duplicate MODE");
			}

			/* Set mode and return */
			parser->current_obj->u.function.mode = mode;
			return;
		}

		case MAPRED_OUTPUT:
		{
			mapred_output_mode_t mode;

			/* Convert input string into a valid mode */
			if (value && !strcasecmp(value, "replace"))
				mode = MAPRED_OUTPUT_MODE_REPLACE;
			else if (value && !strcasecmp(value, "append"))
				mode = MAPRED_OUTPUT_MODE_APPEND;
			else
				mode = MAPRED_OUTPUT_MODE_INVALID;

			/* Error for invalid or duplicate modes */
			if (mode == MAPRED_OUTPUT_MODE_INVALID)
			{
				mapred_parse_error(parser, "Invalid MODE");
			}
			if (parser->current_obj->u.output.mode != MAPRED_OUTPUT_MODE_NONE)
			{
				mode = MAPRED_OUTPUT_MODE_INVALID;
				mapred_parse_error(parser, "Duplicate MODE");
			}

			/* Set mode and return */
			parser->current_obj->u.output.mode = mode;
			return;
		}

		default:
			XASSERT(false);  /* ONLY functions and OUTPUTS have modes */
	}
}

void parser_set_file(mapred_parser_t *parser, char *value)
{
	/*
	 * Only applies to OUTPUTS which have a single file.
	 * INPUTS use parser_begin_files, parser_add_file ...
	 */
	XASSERT(parser->current_doc);
	XASSERT(parser->current_obj);
	XASSERT(parser->current_obj->kind == MAPRED_OUTPUT);

	switch (parser->current_obj->u.output.type)
	{
		case MAPRED_OUTPUT_NONE:
			parser->current_obj->u.output.type = MAPRED_OUTPUT_FILE;
			if (!value || strlen(value) == 0)
			{
				mapred_parse_error(parser, "Invalid FILE");
				return;
			}
			parser->current_obj->u.output.desc = copyscalar(value);
			break;

		case MAPRED_OUTPUT_FILE:
		{
			mapred_parse_error(parser, "Duplicate FILE");
			return;
		}

		case MAPRED_OUTPUT_TABLE:
		{
			mapred_parse_error(parser, "TABLE is incompatible with FILE");
			return;
		}
			
		default:
			XASSERT(false);
	}
}

void parser_set_transition(mapred_parser_t *parser, char *value)
{
	XASSERT(parser->current_doc);
	XASSERT(parser->current_obj);
	XASSERT(parser->current_obj->kind == MAPRED_REDUCER);

	if (parser->current_obj->u.reducer.transition.name)
	{
		mapred_parse_error(parser, "Duplicate TRANSITION for REDUCE");
		return;
	}
	parser->current_obj->u.reducer.transition.name = copyscalar(value);
}

void parser_set_combiner(mapred_parser_t *parser, char *value)
{
	XASSERT(parser->current_doc);
	XASSERT(parser->current_obj);
	XASSERT(parser->current_obj->kind == MAPRED_REDUCER);

	if (parser->current_obj->u.reducer.combiner.name)
	{
		mapred_parse_error(parser, "Duplicate CONSOLIDATE for REDUCE");
		return;
	}
	parser->current_obj->u.reducer.combiner.name = copyscalar(value);
}

void parser_set_finalizer(mapred_parser_t *parser, char *value)
{
	XASSERT(parser->current_doc);
	XASSERT(parser->current_obj);
	XASSERT(parser->current_obj->kind == MAPRED_REDUCER);

	if (parser->current_obj->u.reducer.finalizer.name)
	{
		mapred_parse_error(parser, "Duplicate FINALIZE for REDUCE");
		return;
	}
	parser->current_obj->u.reducer.finalizer.name = copyscalar(value);
}

void parser_set_initialize(mapred_parser_t *parser, char *value)
{
	XASSERT(parser->current_doc);
	XASSERT(parser->current_obj);
	XASSERT(parser->current_obj->kind == MAPRED_REDUCER);

	if (parser->current_obj->u.reducer.initialize)
	{
		mapred_parse_error(parser, "Duplicate INITIALIZE for REDUCE");
		return;
	}
	parser->current_obj->u.reducer.initialize = copyscalar(value);
}


void parser_set_language(mapred_parser_t *parser, char *value)
{
	XASSERT(parser->current_doc);
	XASSERT(parser->current_obj);
	XASSERT(parser->current_obj->kind == MAPRED_MAPPER     ||
			parser->current_obj->kind == MAPRED_TRANSITION ||
			parser->current_obj->kind == MAPRED_COMBINER   ||
			parser->current_obj->kind == MAPRED_FINALIZER);
			
	if (!value || strlen(value) == 0)
	{
		value = "";
		mapred_parse_error(parser, "Invalid LANGUAGE");
	}
	if (parser->current_obj->u.function.language)
	{
		mapred_parse_error(parser, "Duplicate LANGUAGE");
		return;
	}
	parser->current_obj->u.function.language = copyscalar(value);
}

void parser_set_function(mapred_parser_t *parser, char *value)
{
	XASSERT(parser->current_doc);
	XASSERT(parser->current_obj);
	XASSERT(parser->current_obj->kind == MAPRED_MAPPER     ||
			parser->current_obj->kind == MAPRED_TRANSITION ||
			parser->current_obj->kind == MAPRED_COMBINER   ||
			parser->current_obj->kind == MAPRED_FINALIZER);

	if (!value || strlen(value) == 0)
	{
		value = "";
		mapred_parse_error(parser, "Invalid FUNCTION");
	}
	if (parser->current_obj->u.function.body)
	{
		mapred_parse_error(parser, "Duplicate FUNCTION");
		return;
	}
	parser->current_obj->u.function.body = copyscalar(value);


	/*
	 * The "start_mark" of function body has a line number, but what that line
	 * number refers to is a bit finicky depending on the nature of the YAML.
	 * So we take it and adjust it accordingly.
	 */
	parser->current_obj->u.function.lineno = parser->event.start_mark.line;
	switch (parser->event.data.scalar.style)
	{
		case YAML_LITERAL_SCALAR_STYLE:
		case YAML_FOLDED_SCALAR_STYLE:
			parser->current_obj->u.function.lineno += 2;
			break;

		case YAML_PLAIN_SCALAR_STYLE:
		case YAML_SINGLE_QUOTED_SCALAR_STYLE:
		case YAML_DOUBLE_QUOTED_SCALAR_STYLE:
			parser->current_obj->u.function.lineno += 1;
			break;			
		default:
			break;
	}
}

/*
 * parser_set_library was added to support the "LIBRARY" option in mapreduce
 * yaml schema version 1.0.0.2.  This is used by C language functions to
 * specify which code library the C function is defined in.
 *
 * - MAP:
 *     ...
 *     LIBRARY:  $libdir/libfoo
 *     FUNCTION: myFunc
 */
void parser_set_library(mapred_parser_t *parser, char *value)
{
	XASSERT(parser->current_doc);
	XASSERT(parser->current_obj);
	XASSERT(parser->current_obj->kind == MAPRED_MAPPER     ||
			parser->current_obj->kind == MAPRED_TRANSITION ||
			parser->current_obj->kind == MAPRED_COMBINER   ||
			parser->current_obj->kind == MAPRED_FINALIZER);

	if (!value || strlen(value) == 0)
	{
		value = "";
		mapred_parse_error(parser, "Invalid LIBRARY");
	}
	if (parser->current_obj->u.function.library)
	{
		mapred_parse_error(parser, "Duplicate LIBRARY");
		return;
	}
	parser->current_obj->u.function.library = copyscalar(value);

	/*
	 * We will validate that the document version is >= 1.0.0.2
	 * durring object verification.
	 */
}

void parser_set_optimize(mapred_parser_t *parser, char *value)
{
   /* FIXME */
}


void parser_set_source(mapred_parser_t *parser, char *value)
{
	XASSERT(parser->current_doc);
	XASSERT(parser->current_obj);

	switch (parser->current_obj->kind)
	{
		case MAPRED_TASK:
			if (parser->current_obj->u.task.input.name)
			{
				mapred_parse_error(parser, "Duplicate SOURCE for TASK");
				return;
			}
			parser->current_obj->u.task.input.name = copyscalar(value);
			break;

		case MAPRED_EXECUTION:
			if (parser->current_obj->u.task.input.name)
			{
				mapred_parse_error(parser, "Duplicate SOURCE for RUN");
				return;
			}
			parser->current_obj->u.task.input.name = copyscalar(value);
			break;

		default:
			XASSERT(false);
	}
}

void parser_set_target(mapred_parser_t *parser, char *value)
{
	XASSERT(parser->current_doc);
	XASSERT(parser->current_obj);
	XASSERT(parser->current_obj->kind == MAPRED_EXECUTION);

	if (parser->current_obj->u.task.output.name)
	{
		mapred_parse_error(parser, "Duplicate TARGET for RUN");
		return;
	}
	parser->current_obj->u.task.output.name = copyscalar(value);
}

void parser_set_mapper(mapred_parser_t *parser, char *value)
{
	XASSERT(parser->current_doc);
	XASSERT(parser->current_obj);

	switch (parser->current_obj->kind)
	{
		case MAPRED_TASK:
			if (parser->current_obj->u.task.mapper.name)
			{
				mapred_parse_error(parser, "Duplicate MAP for TASK");
				return;
			}
			parser->current_obj->u.task.mapper.name = copyscalar(value);
			break;

		case MAPRED_EXECUTION:
			if (parser->current_obj->u.task.mapper.name)
			{
				mapred_parse_error(parser, "Duplicate MAP for RUN");
				return;
			}
			parser->current_obj->u.task.mapper.name = copyscalar(value);
			break;

		default:
			XASSERT(false);
	}
}

void parser_set_reducer(mapred_parser_t *parser, char *value)
{
	XASSERT(parser->current_doc);
	XASSERT(parser->current_obj);

	switch (parser->current_obj->kind)
	{
		case MAPRED_TASK:
			if (parser->current_obj->u.task.reducer.name)
			{
				mapred_parse_error(parser, "Duplicate REDUCE for TASK");
				return;
			}
			parser->current_obj->u.task.reducer.name = copyscalar(value);
			break;

		case MAPRED_EXECUTION:
			if (parser->current_obj->u.task.reducer.name)
			{
				mapred_parse_error(parser, "Duplicate REDUCE for RUN");
				return;
			}
			parser->current_obj->u.task.reducer.name = copyscalar(value);
			break;

		default:
			XASSERT(false);
	}
}

void parser_begin_ordering(mapred_parser_t *parser)
{
	XASSERT(parser->current_doc);
	XASSERT(parser->current_obj);
	XASSERT(parser->current_obj->kind == MAPRED_REDUCER);

	/*
	 * We will validate that the document version is >= 1.0.0.3
	 * durring object verification.
	 */
	if (parser->current_obj->u.reducer.ordering)
	{
		mapred_parse_error(parser, "Duplicate ORDERING for REDUCER");
		return;
	}
}

void parser_add_ordering(mapred_parser_t *parser, char *value)
{
	mapred_clist_t *newitem;
	mapred_clist_t *clist;

	XASSERT(parser->current_doc);
	XASSERT(parser->current_obj);
	XASSERT(parser->current_obj->kind == MAPRED_REDUCER);

	/*
	 * Validate ordering:
	 *   In general ordering can be an arbitrary expression so it is
	 *   difficult to verify easily.  If we need more verification it
	 *   makes sense to push that verification into the grammar.
	 */
	if (!value || strlen(value) == 0)
	{
		mapred_parse_error(parser, "Invalid ORDERING");
		return;
	}

	/* Allocate the new list item */
	newitem = malloc(sizeof(mapred_clist_t));
	newitem->value = copyscalar(value);
	newitem->next = (mapred_clist_t *) NULL;

	/* Add the new item into the last slot of the list */
	clist = parser->current_obj->u.reducer.ordering;
	if (clist == NULL)
		parser->current_obj->u.reducer.ordering = newitem;
	else
	{
		while (clist && clist->next)
			clist = clist->next;
		clist->next = newitem;
	}
}


/* List functions */
void parser_begin_files(mapred_parser_t *parser)
{
	XASSERT(parser->current_doc);
	XASSERT(parser->current_obj);
	XASSERT(parser->current_obj->kind == MAPRED_INPUT);

	if (parser->current_obj->u.input.type != MAPRED_INPUT_NONE)
	{
		if (parser->current_obj->u.input.type == MAPRED_INPUT_FILE)
		{
			mapred_parse_error(parser, "Duplicate FILE for INPUT");
			return;
		}
		else
		{
			mapred_parse_error(parser, "INPUT may only specify one of "
							   "FILE, GPFDIST, TABLE, QUERY, EXEC");
			return;
		}
	}

	/* files will be added individually */
	parser->current_obj->u.input.type = MAPRED_INPUT_FILE;
}

void parser_begin_gpfdist(mapred_parser_t *parser)
{
	XASSERT(parser->current_doc);
	XASSERT(parser->current_obj);
	XASSERT(parser->current_obj->kind == MAPRED_INPUT);

	if (parser->current_obj->u.input.type != MAPRED_INPUT_NONE)
	{
		if (parser->current_obj->u.input.type == MAPRED_INPUT_GPFDIST)
		{
			mapred_parse_error(parser, "Duplicate GPFDIST for INPUT");
			return;
		}
		else
		{
			mapred_parse_error(parser, "INPUT may only specify one of "
							   "FILE, GPFDIST, TABLE, QUERY, EXEC");
			return;
		}
	}
	parser->current_obj->u.input.type = MAPRED_INPUT_GPFDIST;
}

void parser_begin_columns(mapred_parser_t *parser)
{
	XASSERT(parser->current_doc);
	XASSERT(parser->current_obj);
	XASSERT(parser->current_obj->kind == MAPRED_INPUT);
	if (parser->current_obj->u.input.columns)
	{
		mapred_parse_error(parser, "Duplicate COLUMNS for INPUT");
		return;
	}
}

void parser_begin_parameters(mapred_parser_t *parser)
{
	XASSERT(parser->current_doc);
	XASSERT(parser->current_obj);
	switch (parser->current_obj->kind)
	{
		case MAPRED_MAPPER:
			if (parser->current_obj->u.function.parameters)
			{
				mapred_parse_error(parser, "Duplicate PARAMETERS for MAP");
				return;
			}
			break;

		case MAPRED_TRANSITION:
			if (parser->current_obj->u.function.parameters)
			{
				mapred_parse_error(parser, "Duplicate PARAMETERS for TRANSITION");
				return;
			}
			break;

		case MAPRED_COMBINER:
			if (parser->current_obj->u.function.parameters)
			{
				mapred_parse_error(parser, "Duplicate PARAMETERS for CONSOLIDATE");
				return;
			}
			break;

		case MAPRED_FINALIZER:
			if (parser->current_obj->u.function.parameters)
			{
				mapred_parse_error(parser, "Duplicate PARAMETERS for FINALIZE");
				return;
			}
			break;

		default:
			XASSERT(false);
	}
}

void parser_begin_returns(mapred_parser_t *parser)
{
	XASSERT(parser->current_doc);
	XASSERT(parser->current_obj);
	switch (parser->current_obj->kind)
	{
		case MAPRED_MAPPER:
			if (parser->current_obj->u.function.returns)
			{
				mapred_parse_error(parser, "Duplicate RETURNS for MAP");
				return;
			}
			break;

		case MAPRED_TRANSITION:
			if (parser->current_obj->u.function.returns)
			{
				mapred_parse_error(parser, "Duplicate RETURNS for TRANSITION");
				return;
			}
			break;

		case MAPRED_COMBINER:
			if (parser->current_obj->u.function.returns)
			{
				mapred_parse_error(parser, "Duplicate RETURNS for CONSOLIDATE");
				return;
			}
			break;

		case MAPRED_FINALIZER:
			if (parser->current_obj->u.function.returns)
			{
				mapred_parse_error(parser, "Duplicate RETURNS for FINALIZE");
				return;
			}
			break;

		default:
			XASSERT(false);
	}
}

void parser_begin_keys(mapred_parser_t *parser)
{
	XASSERT(parser->current_doc);
	XASSERT(parser->current_obj);
	XASSERT(parser->current_obj->kind == MAPRED_REDUCER);
	if (parser->current_obj->u.reducer.keys)
	{
		mapred_parse_error(parser, "Duplicate KEYS for REDUCER");
		return;
	}
}

void parser_add_file(mapred_parser_t *parser, char *value)
{
	mapred_clist_t *newitem;
	mapred_clist_t *clist;

	XASSERT(parser->current_doc);
	XASSERT(parser->current_obj);
	XASSERT(parser->current_obj->kind == MAPRED_INPUT);
	XASSERT(parser->current_obj->u.input.type == MAPRED_INPUT_FILE ||
			parser->current_obj->u.input.type == MAPRED_INPUT_GPFDIST);

	/* Verify the new file */
	if (!value || strlen(value) == 0)
	{
		switch (parser->current_obj->u.input.type)
		{
			case MAPRED_INPUT_FILE:
				mapred_parse_error(parser, "Invalid FILE");
				return;
			case MAPRED_INPUT_GPFDIST:
				mapred_parse_error(parser, "Invalid GPFDIST");
				return;
			default:
				XASSERT(false);
		}
	}
	/* Todo: improved regex checking on files */

	/* Allocate the new list item */
	newitem = malloc(sizeof(mapred_clist_t));
	newitem->value = copyscalar(value);
	newitem->next  = (mapred_clist_t *) NULL;

	/* Add the new item into the last slot of the list */
	clist = parser->current_obj->u.input.files;
	while (clist && clist->next)
		clist = clist->next;
	if (clist)
		clist->next = newitem;
	else
		parser->current_obj->u.input.files = newitem;
}

void parser_add_column(mapred_parser_t *parser, char *value)
{
	mapred_plist_t *newitem;
	mapred_plist_t *plist;
	char           *name, *type, *tokenizer;

	XASSERT(parser->current_doc);
	XASSERT(parser->current_obj);
	XASSERT(parser->current_obj->kind == MAPRED_INPUT);

	/*
	 * Verify the new column
	 * It should be in one of two forms:
	 *    1)   <name>
	 *    2)   <name> <datatype>
	 */
	if (!value || strlen(value) == 0)
	{
		mapred_parse_error(parser, "Invalid COLUMNS");
		return;
	}
	name = strtok_r(value, " \t\r", &tokenizer);
	type = strtok_r(NULL, " \t\r", &tokenizer);
	if (!type)
		type = "text";  /* type defaults to 'text' */
	
    /* double check that there's nothing else */
	if (strtok_r(NULL, " \t\r", &tokenizer))
	{
		mapred_parse_error(parser, "Invalid COLUMNS");
		return;
	}

	/* Allocate the new list item */
	newitem = malloc(sizeof(mapred_plist_t));
	newitem->name = copyscalar(name);
	newitem->type = copyscalar(type);
	newitem->next  = (mapred_plist_t *) NULL;

	/* Add the new item into the last slot of the list */
	plist = parser->current_obj->u.input.columns;
	while (plist && plist->next)
		plist = plist->next;
	if (plist)
		plist->next = newitem;
	else
		parser->current_obj->u.input.columns = newitem;
}

void parser_add_parameter(mapred_parser_t *parser, char *value)
{
	mapred_plist_t *newitem;
	mapred_plist_t *plist;
	char           *name, *type, *tokenizer;

	XASSERT(parser->current_doc);
	XASSERT(parser->current_obj);
	XASSERT(parser->current_obj->kind == MAPRED_MAPPER     ||
			parser->current_obj->kind == MAPRED_TRANSITION ||
			parser->current_obj->kind == MAPRED_COMBINER   ||
			parser->current_obj->kind == MAPRED_FINALIZER);

	/*
	 * Verify the new parameter
	 * It should be in one of two forms:
	 *    1)   <name>
	 *    2)   <name> <datatype>
	 */
	if (!value || strlen(value) == 0)
	{
		mapred_parse_error(parser, "Invalid PARAMETERS");
		return;
	}
	name = strtok_r(value, " \t\r", &tokenizer);
	type = strtok_r(NULL, " \t\r", &tokenizer);
	if (!type)
		type = "text";  /* type defaults to 'text' */
	
    /* double check that there's nothing else */
	if (strtok_r(NULL, " \t\r", &tokenizer))
	{
		mapred_parse_error(parser, "Invalid PARAMETERS");
		return;
	}

	/* Allocate the new list item */
	newitem = malloc(sizeof(mapred_plist_t));
	newitem->name = copyscalar(name);
	newitem->type = copyscalar(type);
	newitem->next  = (mapred_plist_t *) NULL;

	/* Add the new item into the last slot of the list */
	plist = parser->current_obj->u.function.parameters;
	while (plist && plist->next)
		plist = plist->next;
	if (plist)
		plist->next = newitem;
	else
		parser->current_obj->u.function.parameters = newitem;
}

void parser_add_return(mapred_parser_t *parser, char *value)
{
	mapred_plist_t *newitem;
	mapred_plist_t *plist;
	char           *name, *type, *tokenizer;

	XASSERT(parser->current_doc);
	XASSERT(parser->current_obj);
	XASSERT(parser->current_obj->kind == MAPRED_MAPPER     ||
			parser->current_obj->kind == MAPRED_TRANSITION ||
			parser->current_obj->kind == MAPRED_COMBINER   ||
			parser->current_obj->kind == MAPRED_FINALIZER);

	/*
	 * Verify the new return
	 * It should be in one of two forms:
	 *    1)   <name>
	 *    2)   <name> <datatype>
	 */
	if (!value || strlen(value) == 0)
	{
		mapred_parse_error(parser, "Invalid RETURNS");
		return;
	}
	name = strtok_r(value, " \t\r", &tokenizer);
	type = strtok_r(NULL, " \t\r", &tokenizer);
	if (!type)
		type = "text";  /* type defaults to 'text' */
	
    /* double check that there's nothing else */
	if (strtok_r(NULL, " \t\r", &tokenizer))
	{
		mapred_parse_error(parser, "Invalid RETURNS");
		return;
	}

	/* Allocate the new list item */
	newitem = malloc(sizeof(mapred_plist_t));
	newitem->name = copyscalar(name);
	newitem->type = copyscalar(type);
	newitem->next  = (mapred_plist_t *) NULL;

	/* Add the new item into the last slot of the list */
	plist = parser->current_obj->u.function.returns;
	while (plist && plist->next)
		plist = plist->next;
	if (plist)
		plist->next = newitem;
	else
		parser->current_obj->u.function.returns = newitem;
}


void parser_add_key(mapred_parser_t *parser, char *value)
{
	mapred_clist_t *newitem;
	mapred_clist_t *clist;

	XASSERT(parser->current_doc);
	XASSERT(parser->current_obj);
	XASSERT(parser->current_obj->kind == MAPRED_REDUCER);

	/* Validate key */
	if (!value || strlen(value) == 0)
	{
		mapred_parse_error(parser, "Invalid KEYS");
		return;
	}

	/* Allocate the new list item */
	newitem = malloc(sizeof(mapred_clist_t));
	newitem->value = copyscalar(value);
	newitem->next  = (mapred_clist_t *) NULL;

	/* Add the new item into the last slot of the list */
	clist = parser->current_obj->u.reducer.keys;
	while (clist && clist->next)
		clist = clist->next;
	if (clist)
		clist->next = newitem;
	else
		parser->current_obj->u.reducer.keys = newitem;
}



/*
 * mapred_dump_yaml - Given an object, dump it's YAML representation.
 *   This is the inverse of parsing
 *
 *   (*) Could be re-written to avoid code duplication issues.
 */
void mapred_dump_yaml(mapred_object_t *obj)
{
	char *ckind = NULL;

	if (!obj)
		return;

	switch (obj->kind)
	{
		case MAPRED_DOCUMENT:
			printf("---\n");
			/* Dumping the current version */
			printf("VERSION:          1.0.0.3\n");
			if (obj->u.document.database)
				printf("DATABASE:         %s\n", obj->u.document.database);
			if (obj->u.document.user)
				printf("USER:             %s\n", obj->u.document.user);
			if (obj->u.document.host)
				printf("HOST:             %s\n", obj->u.document.host);
			if (obj->u.document.port > 0)
				printf("PORT:             %d\n", obj->u.document.port);
			if (obj->u.document.flags & mapred_document_defines)
			{
				mapred_olist_t *sub;
				printf("DEFINE:\n");
				for (sub = obj->u.document.objects; sub; sub = sub->next)
					if (sub->object->kind != MAPRED_EXECUTION)
						mapred_dump_yaml(sub->object);
			}
			if (obj->u.document.flags & mapred_document_executes)
			{
				mapred_olist_t *sub;
				printf("EXECUTE:\n");
				for (sub = obj->u.document.objects; sub; sub = sub->next)
					if (sub->object->kind == MAPRED_EXECUTION)
						mapred_dump_yaml(sub->object);
			}
			break;

		case MAPRED_INPUT:
			printf("  - INPUT:\n");
			if (obj->name)
				printf("      NAME:       %s\n", obj->name);
			if (obj->u.input.columns)
			{
				mapred_plist_t *plist;
				printf("      COLUMNS:\n");
				for (plist = obj->u.input.columns; plist; plist = plist->next)
					printf("        - %s %s\n", plist->name, plist->type);
			}
			if (obj->u.input.delimiter)
				printf("      DELIMITER:  %s\n", obj->u.input.delimiter);
			if (obj->u.input.encoding)
				printf("      ENCODING:   %s\n", obj->u.input.encoding);
			switch (obj->u.input.format)
			{
				case MAPRED_FORMAT_NONE:
					break;
				case MAPRED_FORMAT_TEXT:
					printf("      FORMAT:     TEXT\n");
					break;
				case MAPRED_FORMAT_CSV:
					printf("      FORMAT:     CSV\n");
					break;
				default:
					XASSERT(false);
			}
			switch (obj->u.input.type)
			{
				case MAPRED_INPUT_NONE:
					break;
				case MAPRED_INPUT_FILE:
				{
					mapred_clist_t *clist;
					printf("      FILE:\n");
					for (clist = obj->u.input.files; clist; clist = clist->next)
						printf("        - %s\n", clist->value);
					break;
				}
				case MAPRED_INPUT_GPFDIST:
					printf("      GPFDIST:    %s\n", obj->u.input.desc);
					break;
				case MAPRED_INPUT_TABLE:
					printf("      TABLE:      %s\n", obj->u.input.desc);
					break;
				case MAPRED_INPUT_QUERY:
					printf("      QUERY: |\n");
					printf("         %s\n", obj->u.input.desc);
					break;
				case MAPRED_INPUT_EXEC:
					printf("      EXEC:       %s\n", obj->u.input.desc);
					break;
				default:
					XASSERT(false);
			}
			break;

		case MAPRED_OUTPUT:
			printf("  - OUTPUT:\n");
			if (obj->name)
				printf("      NAME:       %s\n", obj->name);
			switch (obj->u.output.mode)
			{
				case MAPRED_OUTPUT_MODE_NONE:
					break;
				case MAPRED_OUTPUT_MODE_REPLACE:
					printf("      MODE:       REPLACE\n");
					break;
				case MAPRED_OUTPUT_MODE_APPEND:
					printf("      MODE:       APPEND\n");
					break;
				default:
					XASSERT(false);			
			}
			switch (obj->u.output.type)
			{
				case MAPRED_OUTPUT_NONE:
					break;
				case MAPRED_OUTPUT_FILE:
					printf("      FILE:       %s\n", obj->u.output.desc);
					break;
				case MAPRED_OUTPUT_TABLE:
					printf("      TABLE:      %s\n", obj->u.output.desc);
					break;
				default:
					XASSERT(false);
			}
			break;

		case MAPRED_MAPPER:
			ckind = "MAP";
			/* fallthrough */

		case MAPRED_TRANSITION:
			if (!ckind)
				ckind = "TRANSITION";
			/* fallthrough */

		case MAPRED_COMBINER:
			if (!ckind)
				ckind = "CONSOLIDATE";
			/* fallthrough */

		case MAPRED_FINALIZER:
			if (!ckind)
				ckind = "FINALIZE";

			printf("  - %s:\n", ckind);
			if (obj->name)
				printf("      NAME:       %s\n", obj->name);
			if (obj->u.function.parameters)
			{
				mapred_plist_t *plist;
				printf("      PARAMETERS:\n");
				for (plist = obj->u.function.parameters; plist;
					 plist = plist->next)
					printf("        - %s %s\n", plist->name, plist->type);
			}
			if (obj->u.function.returns)
			{
				mapred_plist_t *plist;
				printf("      RETURNS:\n");
				for (plist = obj->u.function.returns; plist;
					 plist = plist->next)
					printf("        - %s %s\n", plist->name, plist->type);
			}
			switch (obj->u.function.mode)
			{
				case MAPRED_MODE_NONE:
					break;
				case MAPRED_MODE_SINGLE:
					printf("      MODE:       SINGLE\n");
					break;
				case MAPRED_MODE_MULTI:
					printf("      MODE:       MULTI\n");
					break;
				case MAPRED_MODE_ACCUMULATED:
					printf("      MODE:       ACCUMULATED\n");
					break;
				case MAPRED_MODE_WINDOWED:
					printf("      MODE:       WINDOWED\n");
					break;
				default:
					printf("      MODE:       UNKNOWN\n");
					break;
			}
			if (obj->u.function.flags)
			{
				printf("      OPTIMIZE:   ");
				if (obj->u.function.flags & mapred_function_strict)
					printf("STRICT ");
				if (obj->u.function.flags & mapred_function_immutable)
					printf("IMMUTABLE ");
				if (obj->u.function.flags & mapred_function_unordered)
					printf("UNORDERED ");
				printf("\n");
			}
			if (obj->u.function.language)
				printf("      LANGUAGE:   %s\n", obj->u.function.language);
			if (obj->u.function.body)
			{
				printf("      FUNCTION: |\n");
				printf("         %s\n", obj->u.function.body);
			}
			break;

		case MAPRED_REDUCER:
			printf("  - REDUCE:\n");
			if (obj->name)
				printf("      NAME:       %s\n", obj->name);
			if (obj->u.reducer.transition.name)
				printf("      TRANSITION: %s\n",
					   obj->u.reducer.transition.name);
			if (obj->u.reducer.combiner.name)
				printf("      CONSOLIDATE:   %s\n",
					   obj->u.reducer.combiner.name);
			if (obj->u.reducer.finalizer.name)
				printf("      FINALIZE:  %s\n",
					   obj->u.reducer.finalizer.name);
			if (obj->u.reducer.initialize)
				printf("      INITIALIZE: %s\n",
					   obj->u.reducer.initialize);
			if (obj->u.reducer.keys)
			{
				mapred_clist_t *clist;
				printf("      KEYS: |\n");
				for (clist = obj->u.reducer.keys; clist; clist = clist->next)
					printf("        - %s\n", clist->value);
			}
			if (obj->u.reducer.ordering)
			{
				mapred_clist_t *clist;
				printf("      ORDERING: |\n");
				for (clist = obj->u.reducer.ordering; clist; clist = clist->next)
					printf("        - %s\n", clist->value);
			}
			break;

		case MAPRED_TASK:
		case MAPRED_EXECUTION:
			if (obj->u.task.execute)
				printf("  - RUN:\n");
			else
				printf("  - TASK:\n");
			if (obj->name)
				printf("      NAME:       %s\n", obj->name);
			if (obj->u.task.input.name)
				printf("      SOURCE:     %s\n", obj->u.task.input.name);
			if (obj->u.task.mapper.name)
				printf("      MAP:     %s\n", obj->u.task.mapper.name);
			if (obj->u.task.reducer.name)
				printf("      REDUCE:    %s\n", obj->u.task.reducer.name);
			if (obj->u.task.output.name)
				printf("      TARGET:     %s\n", obj->u.task.output.name);
			break;

		case MAPRED_NO_KIND:
		default:
			XRAISE(MAPRED_PARSE_INTERNAL,
				   "Unknown object type");
	}
}


	
int mapred_verify_object(mapred_parser_t *parser, mapred_object_t *obj)
{
	char *name;
	int error = NO_ERROR;

	XASSERT(obj);

	/* Verify that all required fields are present and valid */
	name = obj->name ? obj->name : "unnamed";
	switch (obj->kind)
	{
		case MAPRED_DOCUMENT:
			
			/*
			 * If there is a version on the document then it should have
			 * been validated by parser_set_version()
			 */
			if (!obj->u.document.version)
			{
				error = mapred_obj_error(obj, "Missing VERSION",
										 parser->doc_number);
			}

			break;

		case MAPRED_INPUT:

			/* Validate required fields */
			if (!obj->name)
				error = mapred_obj_error(obj, "Missing NAME");
			if (obj->u.input.type == MAPRED_INPUT_NONE)
				error = mapred_obj_error(obj,
						  "Missing FILE, GPFDIST, TABLE, QUERY, or EXEC");

			/* set default values */
			if (error == NO_ERROR)
			{
				if (!obj->u.input.columns)
				{
					obj->u.input.columns = malloc(sizeof(mapred_plist_t));
					obj->u.input.columns->name = copyscalar("value");
					obj->u.input.columns->type = copyscalar("text");
					obj->u.input.columns->next = NULL;
				}
				if (!obj->u.input.columns->next &&
					!obj->u.input.delimiter)
				{
					obj->u.input.delimiter = copyscalar("off");
				}
			}
			break;

		case MAPRED_OUTPUT:

			if (!obj->name)
				error = mapred_obj_error(obj, "Missing NAME");
			if (obj->u.output.type == MAPRED_OUTPUT_NONE)
				error = mapred_obj_error(obj, "Missing FILE or TABLE");
			break;

		case MAPRED_MAPPER:
		case MAPRED_TRANSITION:
		case MAPRED_COMBINER:
		case MAPRED_FINALIZER:

			if (!obj->name)
				error = mapred_obj_error(obj, "Missing NAME");

			/*
			 * We now support "builtin" functions, which are specified by a lack
			 * of an implementation language.  If a language is specified then
			 * a function body is still required.  If a language is not specified
			 * then the function body just defaults to the name of the function.
			 */
			if (obj->name && !obj->u.function.language && !obj->u.function.body)
				obj->u.function.body = copyscalar(obj->name);

			if (obj->u.function.language && !obj->u.function.body)
				error = mapred_obj_error(obj, "Missing FUNCTION");

			/*
			 * LIBRARY is required for "C" language functions.
			 * LIBRARY is invalid for any other language.
			 *
			 * It would be good to verify that LIBRARY is not used in
			 * older YAML formats, but that is difficult given the current
			 * structure of the code.
			 */
			if (obj->u.function.language)
			{
				if (obj->u.function.library)
				{
					if (strcasecmp("C", obj->u.function.language))
					{
						error = mapred_obj_error(obj, "LIBRARY is invalid for "
												 "%s LANGUAGE functions",
												 obj->u.function.language);
					}
				}
				else if (!strcasecmp("C", obj->u.function.language))
				{
					error = mapred_obj_error(obj, "Missing LIBRARY");
				}

				/*
				 * Don't bother filling in default arguments if we already have
				 * an error.
				 */
				if (error)
					break;

				/*
				 * Set default values.
				 *   For builtin functions we delay this so that we can lookup the
				 *   function in the catalog to determine the defaults.
				 */
				if (!obj->u.function.parameters)
				{
					const char *name = default_parameter_names[obj->kind][0];
					name = default_parameter_names[obj->kind][0];
					obj->u.function.parameters = malloc(sizeof(mapred_plist_t));
					obj->u.function.parameters->type = copyscalar("text");
					obj->u.function.parameters->name = copyscalar(name);
					obj->u.function.parameters->next = NULL;

					name = default_parameter_names[obj->kind][1];
					if (name)
					{
						obj->u.function.parameters->next = malloc(sizeof(mapred_plist_t));
						obj->u.function.parameters->next->type = copyscalar("text");
						obj->u.function.parameters->next->name = copyscalar(name);
						obj->u.function.parameters->next->next = NULL;						
					}
				}
				else
				{
					switch (obj->kind)
					{
						case MAPRED_TRANSITION:
							if (!obj->u.function.parameters->next)
							{
								error = mapred_obj_error(
									obj,
									"requires at least 2 input parameters [state, arg1, ...]"
									);
							}
							break;

						case MAPRED_COMBINER:
							if (!obj->u.function.parameters->next ||
								obj->u.function.parameters->next->next)
							{
								error = mapred_obj_error(
									obj,
									"requires exactly 2 input parameters [state1, state2]"
									);
							}
							break;

						case MAPRED_FINALIZER:
							if (obj->u.function.parameters->next)
							{
								error = mapred_obj_error(
									obj,
									"requires exactly 1 input parameter [state]"
									);
							}
							break;

						case MAPRED_MAPPER:
						default:
							break;
					}
				}

				if (!obj->u.function.returns)
				{
					const char *name = default_return_names[obj->kind][0];
					obj->u.function.returns = malloc(sizeof(mapred_plist_t));
					obj->u.function.returns->type = copyscalar("text");
					obj->u.function.returns->name = copyscalar(name);
					obj->u.function.returns->next = NULL;

					name = default_return_names[obj->kind][1];
					if (name)
					{
						obj->u.function.returns->next = malloc(sizeof(mapred_plist_t));
						obj->u.function.returns->next->type = copyscalar("text");
						obj->u.function.returns->next->name = copyscalar(name);
						obj->u.function.returns->next->next = NULL;						
					}
				}
				else if (obj->kind == MAPRED_TRANSITION ||
						 obj->kind == MAPRED_COMBINER)
				{
					if (obj->u.function.returns->next)
					{
						error = mapred_obj_error(
							obj,
							"requires exactly one output parameter [state]"
							);
					}
				}

				/* Set default mode: depends on type of function */
				if (obj->u.function.mode == MAPRED_MODE_NONE)
				{
					if (obj->kind == MAPRED_TRANSITION ||
						obj->kind == MAPRED_COMBINER)
					{
						obj->u.function.mode = MAPRED_MODE_SINGLE;
					}
					else
					{
						obj->u.function.mode = MAPRED_MODE_MULTI;
					}
				}
			}
			break;

		case MAPRED_REDUCER:

			if (!obj->name)
				error = mapred_obj_error(obj, "Missing NAME");
			if (!obj->u.reducer.transition.name)
				error = mapred_obj_error(obj, "Missing TRANSITION");
			/*
			 * Will verify that functions are valid for reducer input after we
			 * have resolved the pointers.
			 */

			/*
			 * It would be good to verify that ORDERING is not used in
			 * older YAML formats, but that is difficult given the current
			 * structure of the code.
			 */

			/*
			 * ORDERING and COMBINER are incompatible
			 */
			if (obj->u.reducer.ordering != NULL &&
				obj->u.reducer.combiner.name)
			{
				error = mapred_obj_error(obj,
										 "REDUCERS cannot specify both a COMBINER "
										 "function and an ORDERING specification");
			}

			/* Setup default "keys" */
			if (!obj->u.reducer.keys)
			{
				obj->u.reducer.keys = malloc(sizeof(mapred_clist_t));
				obj->u.reducer.keys->value = copyscalar("key");
				obj->u.reducer.keys->next = malloc(sizeof(mapred_clist_t));				
				obj->u.reducer.keys->next->next = NULL;
				obj->u.reducer.keys->next->value = copyscalar("*");
			}

			break;

		case MAPRED_TASK:
			if (!obj->name)
				error = mapred_obj_error(obj, "Missing NAME");

			/* Fallthrough */

		case MAPRED_EXECUTION:
			
			if (!obj->u.task.input.name)
				error = mapred_obj_error(obj, "Missing SOURCE");
			
			/* IDENTITY Mappers and Reducers */
			if (obj->u.task.mapper.name &&
				!strcasecmp("IDENTITY", obj->u.task.mapper.name))
			{
				free(obj->u.task.mapper.name);
				obj->u.task.mapper.name = NULL;
			}
			if (obj->u.task.reducer.name &&
				!strcasecmp("IDENTITY", obj->u.task.reducer.name))
			{
				free(obj->u.task.reducer.name);
				obj->u.task.reducer.name = NULL;
			}

			/* STDOUT Output */
			if (obj->u.task.output.name &&
				!strcasecmp("STDOUT", obj->u.task.output.name))
			{
				free(obj->u.task.output.name);
				obj->u.task.output.name = NULL;
			}
			break;

		case MAPRED_NO_KIND:
		default:
			XASSERT(false);
	}

	return error;
}

相关信息

greenplumn 源码目录

相关文章

greenplumn main 源码

greenplumn mapred 源码

greenplumn yaml_private 源码

0  赞