greenplumn parse 源码
greenplumn parse 代码
文件路径:/gpcontrib/gpmapreduce/src/parse.c
#include <parser.h>
#include <except.h>
#include <mapred_errors.h>
#include <stdio.h>
#include <yaml_parse.h>
#include <yaml.h>
#include <stdarg.h>
int mapred_parse_error(mapred_parser_t *parser, char *fmt, ...)
__attribute__((format(PG_PRINTF_ATTRIBUTE, 2, 3)));
int mapred_verify_object(mapred_parser_t *parser, mapred_object_t *obj);
/* -------------------------------------------------------------------------- */
int mapred_parse_error(mapred_parser_t *parser, char *fmt, ...)
{
mapred_object_t *obj = parser->current_obj;
va_list arg;
if (parser && parser->current_doc)
{
if (global_verbose_flag)
fprintf(stderr, " - ");
parser->current_doc->u.document.flags |= mapred_document_error;
}
else if (global_verbose_flag)
fprintf(stderr, " - ");
fprintf(stderr, "Error: ");
if (obj && obj->name)
fprintf(stderr, "%s '%s': ", mapred_kind_name[obj->kind], obj->name);
if (obj && !obj->name)
fprintf(stderr, "%s: ", mapred_kind_name[obj->kind]);
va_start(arg, fmt);
vfprintf(stderr, fmt, arg);
va_end(arg);
if (parser && parser->event.start_mark.line)
fprintf(stderr, ", at line %d", (int) parser->event.start_mark.line+1);
fprintf(stderr, "\n");
return MAPRED_PARSE_ERROR;
}
#define copyscalar(s) \
strcpy(malloc(strlen(s)+1), s)
mapred_olist_t* mapred_parse_string(unsigned char *yaml)
{
mapred_olist_t *documents;
yaml_parser_t parser;
XASSERT(yaml);
if (!yaml_parser_initialize(&parser))
XRAISE(MAPRED_PARSE_INTERNAL,
"YAML parser initialization failed");
yaml_parser_set_input_string(&parser, yaml, strlen((char*) yaml));
documents = mapred_parse_yaml(&parser);
yaml_parser_delete(&parser);
return documents;
}
mapred_olist_t* mapred_parse_file(FILE *file)
{
mapred_olist_t *documents;
yaml_parser_t parser;
XASSERT(file);
if (!yaml_parser_initialize(&parser))
XRAISE(MAPRED_PARSE_INTERNAL,
"YAML parser initialization failed");
yaml_parser_set_input_file(&parser, file);
documents = mapred_parse_yaml(&parser);
yaml_parser_delete(&parser);
return documents;
}
mapred_olist_t* mapred_parse_yaml(yaml_parser_t *yparser)
{
mapred_parser_t parser;
int i;
int error = 0;
mapred_olist_t *doc_item;
/* Give us a clean slate */
memset(&parser, 0, sizeof(parser));
/* Initialize what must be initialized */
#if USE_FLEX_REENTRANT
yaml_scalar_yylex_init (&parser.yscanner);
#endif
parser.yparser = yparser;
parser.state = STATE_YAML_PARSE;
parser.frame = -1;
for (i = 0; i < MAX_CONTEXT_DEPTH; i++)
parser.context[i] = CONTEXT_NONE;
/* Call into the parser, detects grammar errors */
error = yaml_yyparse(&parser);
/* finalize final document */
parser_add_document(&parser);
/* Cleanup and return */
#if USE_FLEX_REENTRANT
yaml_scalar_yylex_destroy(parser.yscanner);
#endif
/* Check for errors within documents */
for (doc_item = parser.doclist;
doc_item && !error;
doc_item = doc_item->next)
{
if (doc_item->object->u.document.flags & mapred_document_error)
error = true;
}
/* Cleanup and return */
if (error)
{
mapred_destroy_olist(&parser.doclist);
XRAISE(MAPRED_PARSE_ERROR, "parse failure");
}
return parser.doclist;
}
void parser_add_document(mapred_parser_t *parser)
{
mapred_olist_t *newitem;
mapred_olist_t *doclist;
int error;
if (!parser->current_doc)
return;
/* Add the last of the documents objects into the document */
parser_add_object(parser, MAPRED_NO_KIND);
/* Verify the completed document */
error = mapred_verify_object(parser, parser->current_doc);
if (error != NO_ERROR)
parser->current_doc->u.document.flags |= mapred_document_error;
/* Allocate the new list item */
newitem = malloc(sizeof(mapred_olist_t));
newitem->object = parser->current_doc;
newitem->next = (mapred_olist_t *) NULL;
/* Insert it into the last slot of the existing list */
doclist = parser->doclist;
while (doclist && doclist->next)
doclist = doclist->next;
if (doclist)
doclist->next = newitem;
else
parser->doclist = newitem;
}
void parser_begin_document(mapred_parser_t *parser)
{
/* If there is a current document add it first */
parser_add_document(parser);
/* Allocate an object for the new document and return */
parser->current_doc = malloc(sizeof(mapred_object_t));
memset(parser->current_doc, 0, sizeof(mapred_object_t));
parser->current_doc->kind = MAPRED_DOCUMENT;
parser->current_doc->u.document.id = ++parser->doc_number;
parser->current_doc->line = (int) parser->event.start_mark.line+1;
if (global_verbose_flag)
fprintf(stderr, " - Parsing YAML Document %d:\n", parser->doc_number);
}
void parser_begin_define(mapred_parser_t *parser)
{
XASSERT(parser->current_doc);
/*
* The only thing we have to do is ensure that this isn't a duplicate
* define list.
*/
if (parser->current_doc->u.document.flags & mapred_document_defines)
{
mapred_parse_error(parser, "Duplicate DEFINE list in DOCUMENT");
return;
}
parser->current_doc->u.document.flags |= mapred_document_defines;
}
void parser_begin_execute(mapred_parser_t *parser)
{
XASSERT(parser->current_doc);
/*
* The only thing we have to do is ensure that this isn't a duplicate
* execution list.
*/
if (parser->current_doc->u.document.flags & mapred_document_executes)
{
mapred_parse_error(parser, "Duplicate EXECUTE list in DOCUMENT");
return;
}
parser->current_doc->u.document.flags |= mapred_document_executes;
}
void parser_set_version(mapred_parser_t *parser, char *value)
{
XASSERT(parser->current_doc);
if (parser->current_doc->u.document.version)
{
mapred_parse_error(parser, "Duplicate Version: %s", value);
return;
}
/*
* We have already assured that the value matches a good regex,
* but we must still validate that the version itself is supported.
*/
if (strcmp(value, "1.0.0.1") < 0 || strcmp(value, "1.0.0.3") > 0)
{
mapred_parse_error(parser, "Unrecognized VERSION");
}
parser->current_doc->u.document.version = copyscalar(value);
}
void parser_set_database(mapred_parser_t *parser, char *value)
{
XASSERT(parser->current_doc);
if (parser->current_doc->u.document.database)
{
mapred_parse_error(parser, "Duplicate Database: %s", value);
return;
}
parser->current_doc->u.document.database = copyscalar(value);
}
void parser_set_user(mapred_parser_t *parser, char *value)
{
XASSERT(parser->current_doc);
if (parser->current_doc->u.document.user)
{
mapred_parse_error(parser, "Duplicate User: %s", value);
return;
}
parser->current_doc->u.document.user = copyscalar(value);
}
void parser_set_host(mapred_parser_t *parser, char *value)
{
XASSERT(parser->current_doc);
if (parser->current_doc->u.document.host)
{
mapred_parse_error(parser, "Duplicate Host: %s", value);
return;
}
parser->current_doc->u.document.host = copyscalar(value);
}
void parser_set_port(mapred_parser_t *parser, char *value)
{
XASSERT(parser->current_doc);
if (parser->current_doc->u.document.port > 0)
{
mapred_parse_error(parser, "Duplicate Port: %s", value);
return;
}
/*
* The parse has already assured that the value consists of a sequence
* of digits, so strtol should convert successfully.
*/
parser->current_doc->u.document.port = (int) strtol(value, NULL, 10);
}
/*
* parser_add_object - Create a new empty object for the current document.
*/
void parser_add_object(mapred_parser_t *parser, mapred_kind_t kind)
{
int error;
XASSERT(parser->current_doc);
/*
* If we have a current object then verify it and add it into the
* document's object list.
*/
if (parser->current_obj)
{
mapred_olist_t *newitem;
mapred_olist_t *objlist;
/* Validate the finished object */
error = mapred_verify_object(parser, parser->current_obj);
if (error != NO_ERROR)
{
mapred_destroy_object(&parser->current_obj);
parser->current_doc->u.document.flags |=
mapred_document_error;
}
else
{
/* Allocate the new list item */
newitem = malloc(sizeof(mapred_olist_t));
newitem->object = parser->current_obj;
newitem->next = (mapred_olist_t *) NULL;
/* Insert it into the last slot of the existing list */
objlist = parser->current_doc->u.document.objects;
while (objlist && objlist->next)
objlist = objlist->next;
if (objlist)
objlist->next = newitem;
else
parser->current_doc->u.document.objects = newitem;
if (global_verbose_flag)
{
const char *type, *name;
XASSERT (newitem->object->kind > 0 &&
newitem->object->kind <= MAPRED_MAXKIND);
type = mapred_kind_name[newitem->object->kind];
name = newitem->object->name;
if (name)
fprintf(stderr, " - %s: %s\n", type, name);
else
fprintf(stderr, " - %s\n", type);
}
}
}
/*
* If 'kind' is 'NO_KIND' then we just add in the current object
* (above) and do not create a new one. We call it this way once
* at the end to add the last object into the current document.
*/
if (kind == MAPRED_NO_KIND)
{
parser->current_obj = (mapred_object_t *) NULL;
return;
}
/* Allocate a new empyt object of the correct kind and return. */
parser->current_obj = malloc(sizeof(mapred_object_t));
memset(parser->current_obj, 0, sizeof(mapred_object_t));
parser->current_obj->kind = kind;
parser->current_obj->line = (int) parser->event.start_mark.line+1;
}
void parser_add_run(mapred_parser_t *parser)
{
/*
* Execution objects just re-use the 'task' structure. The only
* differences are that:
* Execution objects get RUN
* Execution objects do not require (or support) a NAME
*/
parser_add_object(parser, MAPRED_EXECUTION);
parser->current_obj->u.task.execute = true;
}
void parser_set_name(mapred_parser_t *parser, char *value)
{
XASSERT(parser->current_doc);
XASSERT(parser->current_obj);
XASSERT(parser->current_obj->kind == MAPRED_INPUT ||
parser->current_obj->kind == MAPRED_OUTPUT ||
parser->current_obj->kind == MAPRED_MAPPER ||
parser->current_obj->kind == MAPRED_TRANSITION ||
parser->current_obj->kind == MAPRED_COMBINER ||
parser->current_obj->kind == MAPRED_FINALIZER ||
parser->current_obj->kind == MAPRED_REDUCER ||
parser->current_obj->kind == MAPRED_TASK);
/* If this is an invalid name => throw an error */
if (!value || strlen(value) == 0)
{
value = "?";
mapred_parse_error(parser, "Invalid NAME: %s", value);
}
/* If the object already has a name => throw an error */
if (parser->current_obj->name)
{
mapred_parse_error(parser, "Duplicate NAME: %s", value);
return;
}
parser->current_obj->name = copyscalar(value);
}
void parser_set_table(mapred_parser_t *parser, char *value)
{
XASSERT(parser->current_doc);
XASSERT(parser->current_obj);
XASSERT(parser->current_obj->kind == MAPRED_INPUT ||
parser->current_obj->kind == MAPRED_OUTPUT);
if (!value || strlen(value) == 0)
{
value = "";
mapred_parse_error(parser, "Invalid TABLE");
}
if (parser->current_obj->kind == MAPRED_INPUT)
{
if (!value || strlen(value) == 0)
{
if (parser->current_obj->u.input.type == MAPRED_INPUT_NONE)
parser->current_obj->u.input.type = MAPRED_INPUT_TABLE;
mapred_parse_error(parser, "Invalid TABLE");
return;
}
if (parser->current_obj->u.input.type != MAPRED_INPUT_NONE)
{
switch (parser->current_obj->u.input.type)
{
case MAPRED_INPUT_TABLE:
mapred_parse_error(parser,
"Duplicate TABLE");
return;
case MAPRED_INPUT_FILE:
mapred_parse_error(parser,
"FILE is incompatible with TABLE");
return;
case MAPRED_INPUT_GPFDIST:
mapred_parse_error(parser,
"GPFDIST is incompatible with TABLE");
return;
case MAPRED_INPUT_QUERY:
mapred_parse_error(parser,
"QUERY is incompatible with TABLE");
return;
case MAPRED_INPUT_EXEC:
mapred_parse_error(parser,
"GPFDIST is incompatible with TABLE");
return;
default:
XASSERT(false);
}
}
parser->current_obj->u.input.type = MAPRED_INPUT_TABLE;
parser->current_obj->u.input.desc = copyscalar(value);
}
else
{
if (!value || strlen(value) == 0)
{
if (parser->current_obj->u.output.type == MAPRED_OUTPUT_NONE)
parser->current_obj->u.output.type = MAPRED_OUTPUT_TABLE;
mapred_parse_error(parser, "Invalid TABLE");
return;
}
if (parser->current_obj->u.output.type != MAPRED_OUTPUT_NONE)
{
switch (parser->current_obj->u.output.type)
{
case MAPRED_OUTPUT_TABLE:
mapred_parse_error(parser,
"Duplicate TABLE");
return;
case MAPRED_OUTPUT_FILE:
mapred_parse_error(parser,
"FILE is incompatible with TABLE");
return;
default:
XASSERT(false);
}
}
parser->current_obj->u.output.type = MAPRED_OUTPUT_TABLE;
parser->current_obj->u.output.desc = copyscalar(value);
}
}
void parser_set_query(mapred_parser_t *parser, char *value)
{
XASSERT(parser->current_doc);
XASSERT(parser->current_obj);
XASSERT(parser->current_obj->kind == MAPRED_INPUT);
if (!value || strlen(value) == 0)
{
value = "";
mapred_parse_error(parser, "Invalid QUERY");
}
if (parser->current_obj->u.input.type != MAPRED_INPUT_NONE)
{
if (parser->current_obj->u.input.type == MAPRED_INPUT_QUERY)
{
mapred_parse_error(parser, "Duplicate QUERY for INPUT");
return;
}
else
{
mapred_parse_error(parser, "INPUT may only specify one of "
"FILE, GPFDIST, TABLE, QUERY, EXEC");
return;
}
}
parser->current_obj->u.input.type = MAPRED_INPUT_QUERY;
parser->current_obj->u.input.desc = copyscalar(value);
}
void parser_set_exec(mapred_parser_t *parser, char *value)
{
XASSERT(parser->current_doc);
XASSERT(parser->current_obj);
XASSERT(parser->current_obj->kind == MAPRED_INPUT);
if (!value || strlen(value) == 0)
{
value = "";
mapred_parse_error(parser, "Invalid EXEC");
}
if (parser->current_obj->u.input.type != MAPRED_INPUT_NONE)
{
if (parser->current_obj->u.input.type == MAPRED_INPUT_EXEC)
{
mapred_parse_error(parser, "Duplicate EXEC for INPUT");
return;
}
else
{
mapred_parse_error(parser, "INPUT may only specify one of "
"FILE, GPFDIST, TABLE, QUERY, EXEC");
return;
}
}
parser->current_obj->u.input.type = MAPRED_INPUT_EXEC;
parser->current_obj->u.input.desc = copyscalar(value);
}
void parser_set_format(mapred_parser_t *parser, char *value)
{
mapred_format_t format;
XASSERT(parser->current_doc);
XASSERT(parser->current_obj);
if (value && !strcasecmp(value, "text"))
format = MAPRED_FORMAT_TEXT;
else if (value && !strcasecmp(value, "csv"))
format = MAPRED_FORMAT_CSV;
else
format = MAPRED_FORMAT_INVALID;
switch (parser->current_obj->kind)
{
case MAPRED_INPUT:
if (format == MAPRED_FORMAT_INVALID)
mapred_parse_error(parser, "Duplicate FORMAT");
if (parser->current_obj->u.input.format != MAPRED_FORMAT_NONE)
{
format = MAPRED_FORMAT_INVALID;
mapred_parse_error(parser, "Duplicate FORMAT");
}
parser->current_obj->u.input.format = format;
return;
case MAPRED_OUTPUT:
if (format == MAPRED_FORMAT_INVALID)
mapred_parse_error(parser, "Duplicate FORMAT");
if (parser->current_obj->u.output.format != MAPRED_FORMAT_NONE)
{
format = MAPRED_FORMAT_INVALID;
mapred_parse_error(parser, "Duplicate FORMAT");
}
parser->current_obj->u.output.format = format;
return;
default:
XASSERT(false);
}
}
void parser_set_delimiter(mapred_parser_t *parser, char *value)
{
XASSERT(parser->current_doc);
XASSERT(parser->current_obj);
if (!value || strlen(value) == 0)
{
value = "";
mapred_parse_error(parser, "Invalid DELIMITER");
}
switch (parser->current_obj->kind)
{
case MAPRED_INPUT:
if (parser->current_obj->u.input.delimiter)
{
mapred_parse_error(parser, "Duplicate DELIMITER");
return;
}
parser->current_obj->u.input.delimiter = copyscalar(value);
return;
case MAPRED_OUTPUT:
if (parser->current_obj->u.output.delimiter)
{
mapred_parse_error(parser, "Duplicate DELIMITER");
return;
}
parser->current_obj->u.output.delimiter = copyscalar(value);
return;
default:
XASSERT(false);
}
}
void parser_set_escape(mapred_parser_t *parser, char *value)
{
XASSERT(parser->current_doc);
XASSERT(parser->current_obj);
XASSERT(parser->current_obj->kind == MAPRED_INPUT);
if (!value || strlen(value) == 0)
{
value = "";
mapred_parse_error(parser, "Invalid ESCAPE");
}
if (parser->current_obj->u.input.escape)
{
mapred_parse_error(parser, "Duplicate ESCAPE");
return;
}
parser->current_obj->u.input.escape = copyscalar(value);
}
void parser_set_null(mapred_parser_t *parser, char *value)
{
XASSERT(parser->current_doc);
XASSERT(parser->current_obj);
XASSERT(parser->current_obj->kind == MAPRED_INPUT);
if (!value || strlen(value) == 0)
{
parser->current_obj->u.input.null = copyscalar("");
mapred_parse_error(parser, "Invalid NULL");
return;
}
if (parser->current_obj->u.input.null)
{
mapred_parse_error(parser, "Duplicate NULL");
return;
}
parser->current_obj->u.input.null = copyscalar(value);
}
void parser_set_quote(mapred_parser_t *parser, char *value)
{
XASSERT(parser->current_doc);
XASSERT(parser->current_obj);
XASSERT(parser->current_obj->kind == MAPRED_INPUT);
if (!value || strlen(value) == 0)
{
parser->current_obj->u.input.quote = copyscalar("");
mapred_parse_error(parser, "Invalid QUOTE");
return;
}
if (parser->current_obj->u.input.quote)
{
mapred_parse_error(parser, "Duplicate QUOTE");
return;
}
parser->current_obj->u.input.quote = copyscalar(value);
}
void parser_set_encoding(mapred_parser_t *parser, char *value)
{
XASSERT(parser->current_doc);
XASSERT(parser->current_obj);
XASSERT(parser->current_obj->kind == MAPRED_INPUT);
if (!value || strlen(value) == 0)
{
parser->current_obj->u.input.encoding = copyscalar("");
mapred_parse_error(parser, "Invalid ENCODING");
return;
}
if (parser->current_obj->u.input.encoding)
{
mapred_parse_error(parser, "Duplicate ENCODING");
return;
}
parser->current_obj->u.input.encoding = copyscalar(value);
}
void parser_set_error_limit(mapred_parser_t *parser, char *value)
{
XASSERT(parser->current_doc);
XASSERT(parser->current_obj);
XASSERT(parser->current_obj->kind == MAPRED_INPUT);
if (!value || strlen(value) == 0)
{
parser->current_obj->u.input.error_limit = -1;
mapred_parse_error(parser, "Invalid ERROR_LIMIT");
return;
}
if (parser->current_obj->u.input.error_limit > 0)
{
mapred_parse_error(parser, "Duplicate ERROR_LIMIT");
return;
}
/*
* The parse has already assured that the value consists of a sequence
* of digits, so strtol should convert successfully.
*/
parser->current_obj->u.input.error_limit = (int) strtol(value, NULL, 10);
}
void parser_set_mode(mapred_parser_t *parser, char *value)
{
XASSERT(parser->current_doc);
XASSERT(parser->current_obj);
switch (parser->current_obj->kind)
{
case MAPRED_MAPPER:
case MAPRED_TRANSITION:
case MAPRED_COMBINER:
case MAPRED_FINALIZER:
{
mapred_mode_t mode;
/* Convert input string into a valid mode */
if (value && !strcasecmp(value, "single"))
mode = MAPRED_MODE_SINGLE;
else if (value && !strcasecmp(value, "multi"))
mode = MAPRED_MODE_MULTI;
else
mode = MAPRED_MODE_INVALID;
/* Only MAP and FINALIZE support MULTI mode */
if (mode == MAPRED_MODE_MULTI &&
parser->current_obj->kind != MAPRED_MAPPER &&
parser->current_obj->kind != MAPRED_FINALIZER)
{
mode = MAPRED_MODE_INVALID;
}
/* Error for invalid or duplicate modes */
if (mode == MAPRED_MODE_INVALID)
{
mapred_parse_error(parser, "Invalid MODE");
}
if (parser->current_obj->u.function.mode != MAPRED_MODE_NONE)
{
mode = MAPRED_MODE_INVALID;
mapred_parse_error(parser, "Duplicate MODE");
}
/* Set mode and return */
parser->current_obj->u.function.mode = mode;
return;
}
case MAPRED_OUTPUT:
{
mapred_output_mode_t mode;
/* Convert input string into a valid mode */
if (value && !strcasecmp(value, "replace"))
mode = MAPRED_OUTPUT_MODE_REPLACE;
else if (value && !strcasecmp(value, "append"))
mode = MAPRED_OUTPUT_MODE_APPEND;
else
mode = MAPRED_OUTPUT_MODE_INVALID;
/* Error for invalid or duplicate modes */
if (mode == MAPRED_OUTPUT_MODE_INVALID)
{
mapred_parse_error(parser, "Invalid MODE");
}
if (parser->current_obj->u.output.mode != MAPRED_OUTPUT_MODE_NONE)
{
mode = MAPRED_OUTPUT_MODE_INVALID;
mapred_parse_error(parser, "Duplicate MODE");
}
/* Set mode and return */
parser->current_obj->u.output.mode = mode;
return;
}
default:
XASSERT(false); /* ONLY functions and OUTPUTS have modes */
}
}
void parser_set_file(mapred_parser_t *parser, char *value)
{
/*
* Only applies to OUTPUTS which have a single file.
* INPUTS use parser_begin_files, parser_add_file ...
*/
XASSERT(parser->current_doc);
XASSERT(parser->current_obj);
XASSERT(parser->current_obj->kind == MAPRED_OUTPUT);
switch (parser->current_obj->u.output.type)
{
case MAPRED_OUTPUT_NONE:
parser->current_obj->u.output.type = MAPRED_OUTPUT_FILE;
if (!value || strlen(value) == 0)
{
mapred_parse_error(parser, "Invalid FILE");
return;
}
parser->current_obj->u.output.desc = copyscalar(value);
break;
case MAPRED_OUTPUT_FILE:
{
mapred_parse_error(parser, "Duplicate FILE");
return;
}
case MAPRED_OUTPUT_TABLE:
{
mapred_parse_error(parser, "TABLE is incompatible with FILE");
return;
}
default:
XASSERT(false);
}
}
void parser_set_transition(mapred_parser_t *parser, char *value)
{
XASSERT(parser->current_doc);
XASSERT(parser->current_obj);
XASSERT(parser->current_obj->kind == MAPRED_REDUCER);
if (parser->current_obj->u.reducer.transition.name)
{
mapred_parse_error(parser, "Duplicate TRANSITION for REDUCE");
return;
}
parser->current_obj->u.reducer.transition.name = copyscalar(value);
}
void parser_set_combiner(mapred_parser_t *parser, char *value)
{
XASSERT(parser->current_doc);
XASSERT(parser->current_obj);
XASSERT(parser->current_obj->kind == MAPRED_REDUCER);
if (parser->current_obj->u.reducer.combiner.name)
{
mapred_parse_error(parser, "Duplicate CONSOLIDATE for REDUCE");
return;
}
parser->current_obj->u.reducer.combiner.name = copyscalar(value);
}
void parser_set_finalizer(mapred_parser_t *parser, char *value)
{
XASSERT(parser->current_doc);
XASSERT(parser->current_obj);
XASSERT(parser->current_obj->kind == MAPRED_REDUCER);
if (parser->current_obj->u.reducer.finalizer.name)
{
mapred_parse_error(parser, "Duplicate FINALIZE for REDUCE");
return;
}
parser->current_obj->u.reducer.finalizer.name = copyscalar(value);
}
void parser_set_initialize(mapred_parser_t *parser, char *value)
{
XASSERT(parser->current_doc);
XASSERT(parser->current_obj);
XASSERT(parser->current_obj->kind == MAPRED_REDUCER);
if (parser->current_obj->u.reducer.initialize)
{
mapred_parse_error(parser, "Duplicate INITIALIZE for REDUCE");
return;
}
parser->current_obj->u.reducer.initialize = copyscalar(value);
}
void parser_set_language(mapred_parser_t *parser, char *value)
{
XASSERT(parser->current_doc);
XASSERT(parser->current_obj);
XASSERT(parser->current_obj->kind == MAPRED_MAPPER ||
parser->current_obj->kind == MAPRED_TRANSITION ||
parser->current_obj->kind == MAPRED_COMBINER ||
parser->current_obj->kind == MAPRED_FINALIZER);
if (!value || strlen(value) == 0)
{
value = "";
mapred_parse_error(parser, "Invalid LANGUAGE");
}
if (parser->current_obj->u.function.language)
{
mapred_parse_error(parser, "Duplicate LANGUAGE");
return;
}
parser->current_obj->u.function.language = copyscalar(value);
}
void parser_set_function(mapred_parser_t *parser, char *value)
{
XASSERT(parser->current_doc);
XASSERT(parser->current_obj);
XASSERT(parser->current_obj->kind == MAPRED_MAPPER ||
parser->current_obj->kind == MAPRED_TRANSITION ||
parser->current_obj->kind == MAPRED_COMBINER ||
parser->current_obj->kind == MAPRED_FINALIZER);
if (!value || strlen(value) == 0)
{
value = "";
mapred_parse_error(parser, "Invalid FUNCTION");
}
if (parser->current_obj->u.function.body)
{
mapred_parse_error(parser, "Duplicate FUNCTION");
return;
}
parser->current_obj->u.function.body = copyscalar(value);
/*
* The "start_mark" of function body has a line number, but what that line
* number refers to is a bit finicky depending on the nature of the YAML.
* So we take it and adjust it accordingly.
*/
parser->current_obj->u.function.lineno = parser->event.start_mark.line;
switch (parser->event.data.scalar.style)
{
case YAML_LITERAL_SCALAR_STYLE:
case YAML_FOLDED_SCALAR_STYLE:
parser->current_obj->u.function.lineno += 2;
break;
case YAML_PLAIN_SCALAR_STYLE:
case YAML_SINGLE_QUOTED_SCALAR_STYLE:
case YAML_DOUBLE_QUOTED_SCALAR_STYLE:
parser->current_obj->u.function.lineno += 1;
break;
default:
break;
}
}
/*
* parser_set_library was added to support the "LIBRARY" option in mapreduce
* yaml schema version 1.0.0.2. This is used by C language functions to
* specify which code library the C function is defined in.
*
* - MAP:
* ...
* LIBRARY: $libdir/libfoo
* FUNCTION: myFunc
*/
void parser_set_library(mapred_parser_t *parser, char *value)
{
XASSERT(parser->current_doc);
XASSERT(parser->current_obj);
XASSERT(parser->current_obj->kind == MAPRED_MAPPER ||
parser->current_obj->kind == MAPRED_TRANSITION ||
parser->current_obj->kind == MAPRED_COMBINER ||
parser->current_obj->kind == MAPRED_FINALIZER);
if (!value || strlen(value) == 0)
{
value = "";
mapred_parse_error(parser, "Invalid LIBRARY");
}
if (parser->current_obj->u.function.library)
{
mapred_parse_error(parser, "Duplicate LIBRARY");
return;
}
parser->current_obj->u.function.library = copyscalar(value);
/*
* We will validate that the document version is >= 1.0.0.2
* durring object verification.
*/
}
void parser_set_optimize(mapred_parser_t *parser, char *value)
{
/* FIXME */
}
void parser_set_source(mapred_parser_t *parser, char *value)
{
XASSERT(parser->current_doc);
XASSERT(parser->current_obj);
switch (parser->current_obj->kind)
{
case MAPRED_TASK:
if (parser->current_obj->u.task.input.name)
{
mapred_parse_error(parser, "Duplicate SOURCE for TASK");
return;
}
parser->current_obj->u.task.input.name = copyscalar(value);
break;
case MAPRED_EXECUTION:
if (parser->current_obj->u.task.input.name)
{
mapred_parse_error(parser, "Duplicate SOURCE for RUN");
return;
}
parser->current_obj->u.task.input.name = copyscalar(value);
break;
default:
XASSERT(false);
}
}
void parser_set_target(mapred_parser_t *parser, char *value)
{
XASSERT(parser->current_doc);
XASSERT(parser->current_obj);
XASSERT(parser->current_obj->kind == MAPRED_EXECUTION);
if (parser->current_obj->u.task.output.name)
{
mapred_parse_error(parser, "Duplicate TARGET for RUN");
return;
}
parser->current_obj->u.task.output.name = copyscalar(value);
}
void parser_set_mapper(mapred_parser_t *parser, char *value)
{
XASSERT(parser->current_doc);
XASSERT(parser->current_obj);
switch (parser->current_obj->kind)
{
case MAPRED_TASK:
if (parser->current_obj->u.task.mapper.name)
{
mapred_parse_error(parser, "Duplicate MAP for TASK");
return;
}
parser->current_obj->u.task.mapper.name = copyscalar(value);
break;
case MAPRED_EXECUTION:
if (parser->current_obj->u.task.mapper.name)
{
mapred_parse_error(parser, "Duplicate MAP for RUN");
return;
}
parser->current_obj->u.task.mapper.name = copyscalar(value);
break;
default:
XASSERT(false);
}
}
void parser_set_reducer(mapred_parser_t *parser, char *value)
{
XASSERT(parser->current_doc);
XASSERT(parser->current_obj);
switch (parser->current_obj->kind)
{
case MAPRED_TASK:
if (parser->current_obj->u.task.reducer.name)
{
mapred_parse_error(parser, "Duplicate REDUCE for TASK");
return;
}
parser->current_obj->u.task.reducer.name = copyscalar(value);
break;
case MAPRED_EXECUTION:
if (parser->current_obj->u.task.reducer.name)
{
mapred_parse_error(parser, "Duplicate REDUCE for RUN");
return;
}
parser->current_obj->u.task.reducer.name = copyscalar(value);
break;
default:
XASSERT(false);
}
}
void parser_begin_ordering(mapred_parser_t *parser)
{
XASSERT(parser->current_doc);
XASSERT(parser->current_obj);
XASSERT(parser->current_obj->kind == MAPRED_REDUCER);
/*
* We will validate that the document version is >= 1.0.0.3
* durring object verification.
*/
if (parser->current_obj->u.reducer.ordering)
{
mapred_parse_error(parser, "Duplicate ORDERING for REDUCER");
return;
}
}
void parser_add_ordering(mapred_parser_t *parser, char *value)
{
mapred_clist_t *newitem;
mapred_clist_t *clist;
XASSERT(parser->current_doc);
XASSERT(parser->current_obj);
XASSERT(parser->current_obj->kind == MAPRED_REDUCER);
/*
* Validate ordering:
* In general ordering can be an arbitrary expression so it is
* difficult to verify easily. If we need more verification it
* makes sense to push that verification into the grammar.
*/
if (!value || strlen(value) == 0)
{
mapred_parse_error(parser, "Invalid ORDERING");
return;
}
/* Allocate the new list item */
newitem = malloc(sizeof(mapred_clist_t));
newitem->value = copyscalar(value);
newitem->next = (mapred_clist_t *) NULL;
/* Add the new item into the last slot of the list */
clist = parser->current_obj->u.reducer.ordering;
if (clist == NULL)
parser->current_obj->u.reducer.ordering = newitem;
else
{
while (clist && clist->next)
clist = clist->next;
clist->next = newitem;
}
}
/* List functions */
void parser_begin_files(mapred_parser_t *parser)
{
XASSERT(parser->current_doc);
XASSERT(parser->current_obj);
XASSERT(parser->current_obj->kind == MAPRED_INPUT);
if (parser->current_obj->u.input.type != MAPRED_INPUT_NONE)
{
if (parser->current_obj->u.input.type == MAPRED_INPUT_FILE)
{
mapred_parse_error(parser, "Duplicate FILE for INPUT");
return;
}
else
{
mapred_parse_error(parser, "INPUT may only specify one of "
"FILE, GPFDIST, TABLE, QUERY, EXEC");
return;
}
}
/* files will be added individually */
parser->current_obj->u.input.type = MAPRED_INPUT_FILE;
}
void parser_begin_gpfdist(mapred_parser_t *parser)
{
XASSERT(parser->current_doc);
XASSERT(parser->current_obj);
XASSERT(parser->current_obj->kind == MAPRED_INPUT);
if (parser->current_obj->u.input.type != MAPRED_INPUT_NONE)
{
if (parser->current_obj->u.input.type == MAPRED_INPUT_GPFDIST)
{
mapred_parse_error(parser, "Duplicate GPFDIST for INPUT");
return;
}
else
{
mapred_parse_error(parser, "INPUT may only specify one of "
"FILE, GPFDIST, TABLE, QUERY, EXEC");
return;
}
}
parser->current_obj->u.input.type = MAPRED_INPUT_GPFDIST;
}
void parser_begin_columns(mapred_parser_t *parser)
{
XASSERT(parser->current_doc);
XASSERT(parser->current_obj);
XASSERT(parser->current_obj->kind == MAPRED_INPUT);
if (parser->current_obj->u.input.columns)
{
mapred_parse_error(parser, "Duplicate COLUMNS for INPUT");
return;
}
}
void parser_begin_parameters(mapred_parser_t *parser)
{
XASSERT(parser->current_doc);
XASSERT(parser->current_obj);
switch (parser->current_obj->kind)
{
case MAPRED_MAPPER:
if (parser->current_obj->u.function.parameters)
{
mapred_parse_error(parser, "Duplicate PARAMETERS for MAP");
return;
}
break;
case MAPRED_TRANSITION:
if (parser->current_obj->u.function.parameters)
{
mapred_parse_error(parser, "Duplicate PARAMETERS for TRANSITION");
return;
}
break;
case MAPRED_COMBINER:
if (parser->current_obj->u.function.parameters)
{
mapred_parse_error(parser, "Duplicate PARAMETERS for CONSOLIDATE");
return;
}
break;
case MAPRED_FINALIZER:
if (parser->current_obj->u.function.parameters)
{
mapred_parse_error(parser, "Duplicate PARAMETERS for FINALIZE");
return;
}
break;
default:
XASSERT(false);
}
}
void parser_begin_returns(mapred_parser_t *parser)
{
XASSERT(parser->current_doc);
XASSERT(parser->current_obj);
switch (parser->current_obj->kind)
{
case MAPRED_MAPPER:
if (parser->current_obj->u.function.returns)
{
mapred_parse_error(parser, "Duplicate RETURNS for MAP");
return;
}
break;
case MAPRED_TRANSITION:
if (parser->current_obj->u.function.returns)
{
mapred_parse_error(parser, "Duplicate RETURNS for TRANSITION");
return;
}
break;
case MAPRED_COMBINER:
if (parser->current_obj->u.function.returns)
{
mapred_parse_error(parser, "Duplicate RETURNS for CONSOLIDATE");
return;
}
break;
case MAPRED_FINALIZER:
if (parser->current_obj->u.function.returns)
{
mapred_parse_error(parser, "Duplicate RETURNS for FINALIZE");
return;
}
break;
default:
XASSERT(false);
}
}
void parser_begin_keys(mapred_parser_t *parser)
{
XASSERT(parser->current_doc);
XASSERT(parser->current_obj);
XASSERT(parser->current_obj->kind == MAPRED_REDUCER);
if (parser->current_obj->u.reducer.keys)
{
mapred_parse_error(parser, "Duplicate KEYS for REDUCER");
return;
}
}
void parser_add_file(mapred_parser_t *parser, char *value)
{
mapred_clist_t *newitem;
mapred_clist_t *clist;
XASSERT(parser->current_doc);
XASSERT(parser->current_obj);
XASSERT(parser->current_obj->kind == MAPRED_INPUT);
XASSERT(parser->current_obj->u.input.type == MAPRED_INPUT_FILE ||
parser->current_obj->u.input.type == MAPRED_INPUT_GPFDIST);
/* Verify the new file */
if (!value || strlen(value) == 0)
{
switch (parser->current_obj->u.input.type)
{
case MAPRED_INPUT_FILE:
mapred_parse_error(parser, "Invalid FILE");
return;
case MAPRED_INPUT_GPFDIST:
mapred_parse_error(parser, "Invalid GPFDIST");
return;
default:
XASSERT(false);
}
}
/* Todo: improved regex checking on files */
/* Allocate the new list item */
newitem = malloc(sizeof(mapred_clist_t));
newitem->value = copyscalar(value);
newitem->next = (mapred_clist_t *) NULL;
/* Add the new item into the last slot of the list */
clist = parser->current_obj->u.input.files;
while (clist && clist->next)
clist = clist->next;
if (clist)
clist->next = newitem;
else
parser->current_obj->u.input.files = newitem;
}
void parser_add_column(mapred_parser_t *parser, char *value)
{
mapred_plist_t *newitem;
mapred_plist_t *plist;
char *name, *type, *tokenizer;
XASSERT(parser->current_doc);
XASSERT(parser->current_obj);
XASSERT(parser->current_obj->kind == MAPRED_INPUT);
/*
* Verify the new column
* It should be in one of two forms:
* 1) <name>
* 2) <name> <datatype>
*/
if (!value || strlen(value) == 0)
{
mapred_parse_error(parser, "Invalid COLUMNS");
return;
}
name = strtok_r(value, " \t\r", &tokenizer);
type = strtok_r(NULL, " \t\r", &tokenizer);
if (!type)
type = "text"; /* type defaults to 'text' */
/* double check that there's nothing else */
if (strtok_r(NULL, " \t\r", &tokenizer))
{
mapred_parse_error(parser, "Invalid COLUMNS");
return;
}
/* Allocate the new list item */
newitem = malloc(sizeof(mapred_plist_t));
newitem->name = copyscalar(name);
newitem->type = copyscalar(type);
newitem->next = (mapred_plist_t *) NULL;
/* Add the new item into the last slot of the list */
plist = parser->current_obj->u.input.columns;
while (plist && plist->next)
plist = plist->next;
if (plist)
plist->next = newitem;
else
parser->current_obj->u.input.columns = newitem;
}
void parser_add_parameter(mapred_parser_t *parser, char *value)
{
mapred_plist_t *newitem;
mapred_plist_t *plist;
char *name, *type, *tokenizer;
XASSERT(parser->current_doc);
XASSERT(parser->current_obj);
XASSERT(parser->current_obj->kind == MAPRED_MAPPER ||
parser->current_obj->kind == MAPRED_TRANSITION ||
parser->current_obj->kind == MAPRED_COMBINER ||
parser->current_obj->kind == MAPRED_FINALIZER);
/*
* Verify the new parameter
* It should be in one of two forms:
* 1) <name>
* 2) <name> <datatype>
*/
if (!value || strlen(value) == 0)
{
mapred_parse_error(parser, "Invalid PARAMETERS");
return;
}
name = strtok_r(value, " \t\r", &tokenizer);
type = strtok_r(NULL, " \t\r", &tokenizer);
if (!type)
type = "text"; /* type defaults to 'text' */
/* double check that there's nothing else */
if (strtok_r(NULL, " \t\r", &tokenizer))
{
mapred_parse_error(parser, "Invalid PARAMETERS");
return;
}
/* Allocate the new list item */
newitem = malloc(sizeof(mapred_plist_t));
newitem->name = copyscalar(name);
newitem->type = copyscalar(type);
newitem->next = (mapred_plist_t *) NULL;
/* Add the new item into the last slot of the list */
plist = parser->current_obj->u.function.parameters;
while (plist && plist->next)
plist = plist->next;
if (plist)
plist->next = newitem;
else
parser->current_obj->u.function.parameters = newitem;
}
void parser_add_return(mapred_parser_t *parser, char *value)
{
mapred_plist_t *newitem;
mapred_plist_t *plist;
char *name, *type, *tokenizer;
XASSERT(parser->current_doc);
XASSERT(parser->current_obj);
XASSERT(parser->current_obj->kind == MAPRED_MAPPER ||
parser->current_obj->kind == MAPRED_TRANSITION ||
parser->current_obj->kind == MAPRED_COMBINER ||
parser->current_obj->kind == MAPRED_FINALIZER);
/*
* Verify the new return
* It should be in one of two forms:
* 1) <name>
* 2) <name> <datatype>
*/
if (!value || strlen(value) == 0)
{
mapred_parse_error(parser, "Invalid RETURNS");
return;
}
name = strtok_r(value, " \t\r", &tokenizer);
type = strtok_r(NULL, " \t\r", &tokenizer);
if (!type)
type = "text"; /* type defaults to 'text' */
/* double check that there's nothing else */
if (strtok_r(NULL, " \t\r", &tokenizer))
{
mapred_parse_error(parser, "Invalid RETURNS");
return;
}
/* Allocate the new list item */
newitem = malloc(sizeof(mapred_plist_t));
newitem->name = copyscalar(name);
newitem->type = copyscalar(type);
newitem->next = (mapred_plist_t *) NULL;
/* Add the new item into the last slot of the list */
plist = parser->current_obj->u.function.returns;
while (plist && plist->next)
plist = plist->next;
if (plist)
plist->next = newitem;
else
parser->current_obj->u.function.returns = newitem;
}
void parser_add_key(mapred_parser_t *parser, char *value)
{
mapred_clist_t *newitem;
mapred_clist_t *clist;
XASSERT(parser->current_doc);
XASSERT(parser->current_obj);
XASSERT(parser->current_obj->kind == MAPRED_REDUCER);
/* Validate key */
if (!value || strlen(value) == 0)
{
mapred_parse_error(parser, "Invalid KEYS");
return;
}
/* Allocate the new list item */
newitem = malloc(sizeof(mapred_clist_t));
newitem->value = copyscalar(value);
newitem->next = (mapred_clist_t *) NULL;
/* Add the new item into the last slot of the list */
clist = parser->current_obj->u.reducer.keys;
while (clist && clist->next)
clist = clist->next;
if (clist)
clist->next = newitem;
else
parser->current_obj->u.reducer.keys = newitem;
}
/*
* mapred_dump_yaml - Given an object, dump it's YAML representation.
* This is the inverse of parsing
*
* (*) Could be re-written to avoid code duplication issues.
*/
void mapred_dump_yaml(mapred_object_t *obj)
{
char *ckind = NULL;
if (!obj)
return;
switch (obj->kind)
{
case MAPRED_DOCUMENT:
printf("---\n");
/* Dumping the current version */
printf("VERSION: 1.0.0.3\n");
if (obj->u.document.database)
printf("DATABASE: %s\n", obj->u.document.database);
if (obj->u.document.user)
printf("USER: %s\n", obj->u.document.user);
if (obj->u.document.host)
printf("HOST: %s\n", obj->u.document.host);
if (obj->u.document.port > 0)
printf("PORT: %d\n", obj->u.document.port);
if (obj->u.document.flags & mapred_document_defines)
{
mapred_olist_t *sub;
printf("DEFINE:\n");
for (sub = obj->u.document.objects; sub; sub = sub->next)
if (sub->object->kind != MAPRED_EXECUTION)
mapred_dump_yaml(sub->object);
}
if (obj->u.document.flags & mapred_document_executes)
{
mapred_olist_t *sub;
printf("EXECUTE:\n");
for (sub = obj->u.document.objects; sub; sub = sub->next)
if (sub->object->kind == MAPRED_EXECUTION)
mapred_dump_yaml(sub->object);
}
break;
case MAPRED_INPUT:
printf(" - INPUT:\n");
if (obj->name)
printf(" NAME: %s\n", obj->name);
if (obj->u.input.columns)
{
mapred_plist_t *plist;
printf(" COLUMNS:\n");
for (plist = obj->u.input.columns; plist; plist = plist->next)
printf(" - %s %s\n", plist->name, plist->type);
}
if (obj->u.input.delimiter)
printf(" DELIMITER: %s\n", obj->u.input.delimiter);
if (obj->u.input.encoding)
printf(" ENCODING: %s\n", obj->u.input.encoding);
switch (obj->u.input.format)
{
case MAPRED_FORMAT_NONE:
break;
case MAPRED_FORMAT_TEXT:
printf(" FORMAT: TEXT\n");
break;
case MAPRED_FORMAT_CSV:
printf(" FORMAT: CSV\n");
break;
default:
XASSERT(false);
}
switch (obj->u.input.type)
{
case MAPRED_INPUT_NONE:
break;
case MAPRED_INPUT_FILE:
{
mapred_clist_t *clist;
printf(" FILE:\n");
for (clist = obj->u.input.files; clist; clist = clist->next)
printf(" - %s\n", clist->value);
break;
}
case MAPRED_INPUT_GPFDIST:
printf(" GPFDIST: %s\n", obj->u.input.desc);
break;
case MAPRED_INPUT_TABLE:
printf(" TABLE: %s\n", obj->u.input.desc);
break;
case MAPRED_INPUT_QUERY:
printf(" QUERY: |\n");
printf(" %s\n", obj->u.input.desc);
break;
case MAPRED_INPUT_EXEC:
printf(" EXEC: %s\n", obj->u.input.desc);
break;
default:
XASSERT(false);
}
break;
case MAPRED_OUTPUT:
printf(" - OUTPUT:\n");
if (obj->name)
printf(" NAME: %s\n", obj->name);
switch (obj->u.output.mode)
{
case MAPRED_OUTPUT_MODE_NONE:
break;
case MAPRED_OUTPUT_MODE_REPLACE:
printf(" MODE: REPLACE\n");
break;
case MAPRED_OUTPUT_MODE_APPEND:
printf(" MODE: APPEND\n");
break;
default:
XASSERT(false);
}
switch (obj->u.output.type)
{
case MAPRED_OUTPUT_NONE:
break;
case MAPRED_OUTPUT_FILE:
printf(" FILE: %s\n", obj->u.output.desc);
break;
case MAPRED_OUTPUT_TABLE:
printf(" TABLE: %s\n", obj->u.output.desc);
break;
default:
XASSERT(false);
}
break;
case MAPRED_MAPPER:
ckind = "MAP";
/* fallthrough */
case MAPRED_TRANSITION:
if (!ckind)
ckind = "TRANSITION";
/* fallthrough */
case MAPRED_COMBINER:
if (!ckind)
ckind = "CONSOLIDATE";
/* fallthrough */
case MAPRED_FINALIZER:
if (!ckind)
ckind = "FINALIZE";
printf(" - %s:\n", ckind);
if (obj->name)
printf(" NAME: %s\n", obj->name);
if (obj->u.function.parameters)
{
mapred_plist_t *plist;
printf(" PARAMETERS:\n");
for (plist = obj->u.function.parameters; plist;
plist = plist->next)
printf(" - %s %s\n", plist->name, plist->type);
}
if (obj->u.function.returns)
{
mapred_plist_t *plist;
printf(" RETURNS:\n");
for (plist = obj->u.function.returns; plist;
plist = plist->next)
printf(" - %s %s\n", plist->name, plist->type);
}
switch (obj->u.function.mode)
{
case MAPRED_MODE_NONE:
break;
case MAPRED_MODE_SINGLE:
printf(" MODE: SINGLE\n");
break;
case MAPRED_MODE_MULTI:
printf(" MODE: MULTI\n");
break;
case MAPRED_MODE_ACCUMULATED:
printf(" MODE: ACCUMULATED\n");
break;
case MAPRED_MODE_WINDOWED:
printf(" MODE: WINDOWED\n");
break;
default:
printf(" MODE: UNKNOWN\n");
break;
}
if (obj->u.function.flags)
{
printf(" OPTIMIZE: ");
if (obj->u.function.flags & mapred_function_strict)
printf("STRICT ");
if (obj->u.function.flags & mapred_function_immutable)
printf("IMMUTABLE ");
if (obj->u.function.flags & mapred_function_unordered)
printf("UNORDERED ");
printf("\n");
}
if (obj->u.function.language)
printf(" LANGUAGE: %s\n", obj->u.function.language);
if (obj->u.function.body)
{
printf(" FUNCTION: |\n");
printf(" %s\n", obj->u.function.body);
}
break;
case MAPRED_REDUCER:
printf(" - REDUCE:\n");
if (obj->name)
printf(" NAME: %s\n", obj->name);
if (obj->u.reducer.transition.name)
printf(" TRANSITION: %s\n",
obj->u.reducer.transition.name);
if (obj->u.reducer.combiner.name)
printf(" CONSOLIDATE: %s\n",
obj->u.reducer.combiner.name);
if (obj->u.reducer.finalizer.name)
printf(" FINALIZE: %s\n",
obj->u.reducer.finalizer.name);
if (obj->u.reducer.initialize)
printf(" INITIALIZE: %s\n",
obj->u.reducer.initialize);
if (obj->u.reducer.keys)
{
mapred_clist_t *clist;
printf(" KEYS: |\n");
for (clist = obj->u.reducer.keys; clist; clist = clist->next)
printf(" - %s\n", clist->value);
}
if (obj->u.reducer.ordering)
{
mapred_clist_t *clist;
printf(" ORDERING: |\n");
for (clist = obj->u.reducer.ordering; clist; clist = clist->next)
printf(" - %s\n", clist->value);
}
break;
case MAPRED_TASK:
case MAPRED_EXECUTION:
if (obj->u.task.execute)
printf(" - RUN:\n");
else
printf(" - TASK:\n");
if (obj->name)
printf(" NAME: %s\n", obj->name);
if (obj->u.task.input.name)
printf(" SOURCE: %s\n", obj->u.task.input.name);
if (obj->u.task.mapper.name)
printf(" MAP: %s\n", obj->u.task.mapper.name);
if (obj->u.task.reducer.name)
printf(" REDUCE: %s\n", obj->u.task.reducer.name);
if (obj->u.task.output.name)
printf(" TARGET: %s\n", obj->u.task.output.name);
break;
case MAPRED_NO_KIND:
default:
XRAISE(MAPRED_PARSE_INTERNAL,
"Unknown object type");
}
}
int mapred_verify_object(mapred_parser_t *parser, mapred_object_t *obj)
{
char *name;
int error = NO_ERROR;
XASSERT(obj);
/* Verify that all required fields are present and valid */
name = obj->name ? obj->name : "unnamed";
switch (obj->kind)
{
case MAPRED_DOCUMENT:
/*
* If there is a version on the document then it should have
* been validated by parser_set_version()
*/
if (!obj->u.document.version)
{
error = mapred_obj_error(obj, "Missing VERSION",
parser->doc_number);
}
break;
case MAPRED_INPUT:
/* Validate required fields */
if (!obj->name)
error = mapred_obj_error(obj, "Missing NAME");
if (obj->u.input.type == MAPRED_INPUT_NONE)
error = mapred_obj_error(obj,
"Missing FILE, GPFDIST, TABLE, QUERY, or EXEC");
/* set default values */
if (error == NO_ERROR)
{
if (!obj->u.input.columns)
{
obj->u.input.columns = malloc(sizeof(mapred_plist_t));
obj->u.input.columns->name = copyscalar("value");
obj->u.input.columns->type = copyscalar("text");
obj->u.input.columns->next = NULL;
}
if (!obj->u.input.columns->next &&
!obj->u.input.delimiter)
{
obj->u.input.delimiter = copyscalar("off");
}
}
break;
case MAPRED_OUTPUT:
if (!obj->name)
error = mapred_obj_error(obj, "Missing NAME");
if (obj->u.output.type == MAPRED_OUTPUT_NONE)
error = mapred_obj_error(obj, "Missing FILE or TABLE");
break;
case MAPRED_MAPPER:
case MAPRED_TRANSITION:
case MAPRED_COMBINER:
case MAPRED_FINALIZER:
if (!obj->name)
error = mapred_obj_error(obj, "Missing NAME");
/*
* We now support "builtin" functions, which are specified by a lack
* of an implementation language. If a language is specified then
* a function body is still required. If a language is not specified
* then the function body just defaults to the name of the function.
*/
if (obj->name && !obj->u.function.language && !obj->u.function.body)
obj->u.function.body = copyscalar(obj->name);
if (obj->u.function.language && !obj->u.function.body)
error = mapred_obj_error(obj, "Missing FUNCTION");
/*
* LIBRARY is required for "C" language functions.
* LIBRARY is invalid for any other language.
*
* It would be good to verify that LIBRARY is not used in
* older YAML formats, but that is difficult given the current
* structure of the code.
*/
if (obj->u.function.language)
{
if (obj->u.function.library)
{
if (strcasecmp("C", obj->u.function.language))
{
error = mapred_obj_error(obj, "LIBRARY is invalid for "
"%s LANGUAGE functions",
obj->u.function.language);
}
}
else if (!strcasecmp("C", obj->u.function.language))
{
error = mapred_obj_error(obj, "Missing LIBRARY");
}
/*
* Don't bother filling in default arguments if we already have
* an error.
*/
if (error)
break;
/*
* Set default values.
* For builtin functions we delay this so that we can lookup the
* function in the catalog to determine the defaults.
*/
if (!obj->u.function.parameters)
{
const char *name = default_parameter_names[obj->kind][0];
name = default_parameter_names[obj->kind][0];
obj->u.function.parameters = malloc(sizeof(mapred_plist_t));
obj->u.function.parameters->type = copyscalar("text");
obj->u.function.parameters->name = copyscalar(name);
obj->u.function.parameters->next = NULL;
name = default_parameter_names[obj->kind][1];
if (name)
{
obj->u.function.parameters->next = malloc(sizeof(mapred_plist_t));
obj->u.function.parameters->next->type = copyscalar("text");
obj->u.function.parameters->next->name = copyscalar(name);
obj->u.function.parameters->next->next = NULL;
}
}
else
{
switch (obj->kind)
{
case MAPRED_TRANSITION:
if (!obj->u.function.parameters->next)
{
error = mapred_obj_error(
obj,
"requires at least 2 input parameters [state, arg1, ...]"
);
}
break;
case MAPRED_COMBINER:
if (!obj->u.function.parameters->next ||
obj->u.function.parameters->next->next)
{
error = mapred_obj_error(
obj,
"requires exactly 2 input parameters [state1, state2]"
);
}
break;
case MAPRED_FINALIZER:
if (obj->u.function.parameters->next)
{
error = mapred_obj_error(
obj,
"requires exactly 1 input parameter [state]"
);
}
break;
case MAPRED_MAPPER:
default:
break;
}
}
if (!obj->u.function.returns)
{
const char *name = default_return_names[obj->kind][0];
obj->u.function.returns = malloc(sizeof(mapred_plist_t));
obj->u.function.returns->type = copyscalar("text");
obj->u.function.returns->name = copyscalar(name);
obj->u.function.returns->next = NULL;
name = default_return_names[obj->kind][1];
if (name)
{
obj->u.function.returns->next = malloc(sizeof(mapred_plist_t));
obj->u.function.returns->next->type = copyscalar("text");
obj->u.function.returns->next->name = copyscalar(name);
obj->u.function.returns->next->next = NULL;
}
}
else if (obj->kind == MAPRED_TRANSITION ||
obj->kind == MAPRED_COMBINER)
{
if (obj->u.function.returns->next)
{
error = mapred_obj_error(
obj,
"requires exactly one output parameter [state]"
);
}
}
/* Set default mode: depends on type of function */
if (obj->u.function.mode == MAPRED_MODE_NONE)
{
if (obj->kind == MAPRED_TRANSITION ||
obj->kind == MAPRED_COMBINER)
{
obj->u.function.mode = MAPRED_MODE_SINGLE;
}
else
{
obj->u.function.mode = MAPRED_MODE_MULTI;
}
}
}
break;
case MAPRED_REDUCER:
if (!obj->name)
error = mapred_obj_error(obj, "Missing NAME");
if (!obj->u.reducer.transition.name)
error = mapred_obj_error(obj, "Missing TRANSITION");
/*
* Will verify that functions are valid for reducer input after we
* have resolved the pointers.
*/
/*
* It would be good to verify that ORDERING is not used in
* older YAML formats, but that is difficult given the current
* structure of the code.
*/
/*
* ORDERING and COMBINER are incompatible
*/
if (obj->u.reducer.ordering != NULL &&
obj->u.reducer.combiner.name)
{
error = mapred_obj_error(obj,
"REDUCERS cannot specify both a COMBINER "
"function and an ORDERING specification");
}
/* Setup default "keys" */
if (!obj->u.reducer.keys)
{
obj->u.reducer.keys = malloc(sizeof(mapred_clist_t));
obj->u.reducer.keys->value = copyscalar("key");
obj->u.reducer.keys->next = malloc(sizeof(mapred_clist_t));
obj->u.reducer.keys->next->next = NULL;
obj->u.reducer.keys->next->value = copyscalar("*");
}
break;
case MAPRED_TASK:
if (!obj->name)
error = mapred_obj_error(obj, "Missing NAME");
/* Fallthrough */
case MAPRED_EXECUTION:
if (!obj->u.task.input.name)
error = mapred_obj_error(obj, "Missing SOURCE");
/* IDENTITY Mappers and Reducers */
if (obj->u.task.mapper.name &&
!strcasecmp("IDENTITY", obj->u.task.mapper.name))
{
free(obj->u.task.mapper.name);
obj->u.task.mapper.name = NULL;
}
if (obj->u.task.reducer.name &&
!strcasecmp("IDENTITY", obj->u.task.reducer.name))
{
free(obj->u.task.reducer.name);
obj->u.task.reducer.name = NULL;
}
/* STDOUT Output */
if (obj->u.task.output.name &&
!strcasecmp("STDOUT", obj->u.task.output.name))
{
free(obj->u.task.output.name);
obj->u.task.output.name = NULL;
}
break;
case MAPRED_NO_KIND:
default:
XASSERT(false);
}
return error;
}
相关信息
相关文章
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦